Skip to content

Commit

Permalink
IPInfo pack for detection engine (#617)
Browse files Browse the repository at this point in the history
* IPInfo pack for detection engine

* use sourceIPs for EKS
  • Loading branch information
debugmiller authored Jan 10, 2023
1 parent 9b1d3fb commit 8214016
Show file tree
Hide file tree
Showing 6 changed files with 914 additions and 0 deletions.
186 changes: 186 additions & 0 deletions global_helpers/global_helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
sys.path.append(os.path.dirname(__file__))

import panther_base_helpers as p_b_h # pylint: disable=C0413
import panther_ipinfo_helpers as p_i_h # pylint: disable=C0413
import panther_tor_helpers as p_tor_h # pylint: disable=C0413


Expand Down Expand Up @@ -227,5 +228,190 @@ def test_context(self):
)


class TestIpInfoHelpersLocation(unittest.TestCase):
def setUp(self):
self.match_field = "clientIp"
self.event = {
"p_enrichment": {
p_i_h.IPINFO_LOCATION_LUT_NAME: {
self.match_field: {
"city": "Constantinople",
"country": "Byzantium",
"lat": "41.008610",
"lng": "28.971111",
"postal_code": "NA",
"region": "Asia Minor",
"region_code": "123",
"timezone": "GMT+03:00",
}
}
}
}
self.ip_info = p_i_h.get_ipinfo_location(self.event)

def test_city(self):
city = self.ip_info.city(self.match_field)
self.assertEqual(city, "Constantinople")

def test_country(self):
country = self.ip_info.country(self.match_field)
self.assertEqual(country, "Byzantium")

def test_latitude(self):
latitude = self.ip_info.latitude(self.match_field)
self.assertEqual(latitude, "41.008610")

def test_longitude(self):
longitude = self.ip_info.longitude(self.match_field)
self.assertEqual(longitude, "28.971111")

def test_postal_code(self):
postal_code = self.ip_info.postal_code(self.match_field)
self.assertEqual(postal_code, "NA")

def test_region(self):
region = self.ip_info.region(self.match_field)
self.assertEqual(region, "Asia Minor")

def test_region_code(self):
region_code = self.ip_info.region_code(self.match_field)
self.assertEqual(region_code, "123")

def test_timezone(self):
timezone = self.ip_info.timezone(self.match_field)
self.assertEqual(timezone, "GMT+03:00")

def test_not_found(self):
self.assertEqual(self.ip_info.timezone("not_found"), None)

def test_context(self):
expected = {
"City": "Constantinople",
"Country": "Byzantium",
"Latitude": "41.008610",
"Longitude": "28.971111",
"PostalCode": "NA",
"Region": "Asia Minor",
"RegionCode": "123",
"Timezone": "GMT+03:00",
}
self.assertEqual(expected, self.ip_info.context(self.match_field))


class TestIpInfoHelpersASN(unittest.TestCase):
def setUp(self):
self.match_field = "clientIp"
self.event = {
"p_enrichment": {
p_i_h.IPINFO_ASN_LUT_NAME: {
self.match_field: {
"asn": "AS00000",
"domain": "byzantineempire.com",
"name": "Byzantine Empire",
"route": "1.2.3.4/24",
"type": "isp",
}
}
}
}
self.ip_info = p_i_h.get_ipinfo_asn(self.event)

def test_asn(self):
asn = self.ip_info.asn(self.match_field)
self.assertEqual(asn, "AS00000")

def test_domain(self):
domain = self.ip_info.domain(self.match_field)
self.assertEqual(domain, "byzantineempire.com")

def test_name(self):
name = self.ip_info.name(self.match_field)
self.assertEqual(name, "Byzantine Empire")

def test_route(self):
route = self.ip_info.route(self.match_field)
self.assertEqual(route, "1.2.3.4/24")

def test_type(self):
_type = self.ip_info.type(self.match_field)
self.assertEqual(_type, "isp")

def test_not_found(self):
self.assertEqual(self.ip_info.type("not_found"), None)

def test_context(self):
expected = {
"ASN": "AS00000",
"Domain": "byzantineempire.com",
"Name": "Byzantine Empire",
"Route": "1.2.3.4/24",
"Type": "isp",
}
self.assertEqual(expected, self.ip_info.context(self.match_field))


class TestGeoInfoFromIP(unittest.TestCase):
def setUp(self):
self.match_field = "clientIp"
self.event = {
"p_enrichment": {
p_i_h.IPINFO_ASN_LUT_NAME: {
self.match_field: {
"asn": "AS00000",
"domain": "byzantineempire.com",
"name": "Byzantine Empire",
"route": "1.2.3.4/24",
"type": "isp",
}
},
p_i_h.IPINFO_LOCATION_LUT_NAME: {
self.match_field: {
"city": "Constantinople",
"country": "Byzantium",
"lat": "41.008610",
"lng": "28.971111",
"postal_code": "NA",
"region": "Asia Minor",
"region_code": "123",
"timezone": "GMT+03:00",
}
},
},
self.match_field: "1.2.3.4",
}

def test_geoinfo(self):
geoinfo = p_i_h.geoinfo_from_ip(self.event, self.match_field)
expected = {
"city": "Constantinople",
"country": "Byzantium",
"ip": "1.2.3.4",
"loc": "41.008610,28.971111",
"org": "AS00000 Byzantine Empire",
"postal": "NA",
"region": "Asia Minor",
"timezone": "GMT+03:00",
}
self.assertEqual(expected, geoinfo)

def test_ipinfo_not_enabled_exception(self):
event = {"p_enrichment": {}}
with self.assertRaises(p_i_h.PantherIPInfoException) as exc:
p_i_h.geoinfo_from_ip(event, "fake_field")

self.assertEqual(
exc.exception.args[0], "Please enable both IPInfo Location and ASN Enrichment Providers"
)

def test_ipinfo_missing_match_exception(self):
with self.assertRaises(p_i_h.PantherIPInfoException) as exc:
p_i_h.geoinfo_from_ip(self.event, "fake_field")

self.assertEqual(
exc.exception.args[0],
"IPInfo is not configured on the provided match_field: fake_field",
)


if __name__ == "__main__":
unittest.main()
132 changes: 132 additions & 0 deletions global_helpers/panther_ipinfo_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from panther_base_helpers import deep_get

IPINFO_LOCATION_LUT_NAME = "ipinfo_location"
IPINFO_ASN_LUT_NAME = "ipinfo_asn"


class PantherIPInfoException(Exception):
...


class IPInfoLocation:
"""Helper to get IPInfo location information for enriched fields"""

def __init__(self, event):
self.ipinfo_location = deep_get(event, "p_enrichment", IPINFO_LOCATION_LUT_NAME)
self.event = event

def city(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "city")

def country(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "country")

def latitude(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "lat")

def longitude(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "lng")

def postal_code(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "postal_code")

def region(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "region")

def region_code(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "region_code")

def timezone(self, match_field) -> str:
return deep_get(self.ipinfo_location, match_field, "timezone")

def context(self, match_field) -> object:
return {
"City": self.city(match_field),
"Country": self.country(match_field),
"Latitude": self.latitude(match_field),
"Longitude": self.longitude(match_field),
"PostalCode": self.postal_code(match_field),
"Region": self.region(match_field),
"RegionCode": self.region_code(match_field),
"Timezone": self.timezone(match_field),
}


class IPInfoASN:
"""Helper to get IPInfo ASN information for enriched fields"""

def __init__(self, event):
self.ipinfo_asn = deep_get(event, "p_enrichment", IPINFO_ASN_LUT_NAME)
self.event = event

def asn(self, match_field) -> str:
return deep_get(self.ipinfo_asn, match_field, "asn")

def domain(self, match_field) -> str:
return deep_get(self.ipinfo_asn, match_field, "domain")

def name(self, match_field) -> str:
return deep_get(self.ipinfo_asn, match_field, "name")

def route(self, match_field) -> str:
return deep_get(self.ipinfo_asn, match_field, "route")

def type(self, match_field) -> str:
return deep_get(self.ipinfo_asn, match_field, "type")

def context(self, match_field) -> object:
return {
"ASN": self.asn(match_field),
"Domain": self.domain(match_field),
"Name": self.name(match_field),
"Route": self.route(match_field),
"Type": self.type(match_field),
}


def get_ipinfo_location(event):
"""Returns an IPInfoLocation object for the event or None if it is not available"""
if deep_get(event, "p_enrichment", IPINFO_LOCATION_LUT_NAME):
return IPInfoLocation(event)
return None


def get_ipinfo_asn(event):
"""Returns an IPInfoASN object for the event or None if it is not available"""
if deep_get(event, "p_enrichment", IPINFO_ASN_LUT_NAME):
return IPInfoASN(event)
return None


def geoinfo_from_ip(event, match_field):
"""Returns a dictionary with geolocation information that is the same format as
panther_oss_helper.geoinfo_from_ip() with the following differences:
- instead of poviding the ip, you must provide the event and the match_field
- the fields "hostname" and "anycast" are not included in the return object
"""
location = get_ipinfo_location(event)
asn = get_ipinfo_asn(event)
if location is None or asn is None:
raise PantherIPInfoException(
"Please enable both IPInfo Location and ASN Enrichment Providers"
)

if (
deep_get(asn.ipinfo_asn, match_field) is None
or deep_get(location.ipinfo_location, match_field) is None
):
raise PantherIPInfoException(
f"IPInfo is not configured on the provided match_field: {match_field}"
)

return {
"ip": event.get(match_field),
"city": location.city(match_field),
"region": location.region(match_field),
"country": location.country(match_field),
"loc": f"{location.latitude(match_field)},{location.longitude(match_field)}",
"org": f"{asn.asn(match_field)} {asn.name(match_field)}",
"postal": location.postal_code(match_field),
"timezone": location.timezone(match_field),
}
4 changes: 4 additions & 0 deletions global_helpers/panther_ipinfo_helpers.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
AnalysisType: global
Filename: panther_ipinfo_helpers.py
GlobalID: panther_ipinfo_helpers
Description: Used to simplify the use of IPInfo Data in Rules
Loading

0 comments on commit 8214016

Please sign in to comment.