diff --git a/agent/whois_ip_agent.py b/agent/whois_ip_agent.py index 11bf0b7..0f498cd 100644 --- a/agent/whois_ip_agent.py +++ b/agent/whois_ip_agent.py @@ -105,6 +105,7 @@ def _process_dns_record(self, message: m.Message) -> None: except ( ipwhois.exceptions.IPDefinedError, ipwhois.exceptions.HTTPLookupError, + ipwhois.exceptions.ASNRegistryError, ): # Case where of the loopback address. logger.warning( diff --git a/tests/conftest.py b/tests/conftest.py index d4147e3..113e37a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -161,3 +161,15 @@ def scan_message_ipv_with_incorrect_version() -> message.Message: "version": 5, } return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_global_ipv4_with_mask32() -> message.Message: + """Creates a message of type v3.asset.ip with global IP address""" + selector = "v3.asset.ip" + msg_data = { + "host": "41.0.0.0", + "mask": "32", + "version": 4, + } + return message.Message.from_data(selector, data=msg_data) diff --git a/tests/whois_ip_agent_test.py b/tests/whois_ip_agent_test.py index 6265fa5..299f0b5 100644 --- a/tests/whois_ip_agent_test.py +++ b/tests/whois_ip_agent_test.py @@ -1,9 +1,10 @@ """Unittests for WhoisIP agent.""" from typing import List, Dict +import ipwhois +import pytest from ostorlab.agent.message import message from pytest_mock import plugin -import pytest from agent import whois_ip_agent @@ -281,3 +282,24 @@ def testWhoisIP_whenIPAssetHasIncorrectVersion_raiseValueError( """Test the CIDR Limit in case IP has incorrect version.""" with pytest.raises(ValueError, match="Incorrect ip version 5."): test_agent.process(scan_message_ipv_with_incorrect_version) + + +def testWhoisIP_whenIPHasNoASN_doesNotCrash( + test_agent: whois_ip_agent.WhoisIPAgent, + agent_mock: List[message.Message], + mocker: plugin.MockerFixture, + scan_message_global_ipv4_with_mask32: message.Message, +) -> None: + """Test the CIDR Limit in case IP has no ASN.""" + mocker.patch( + "ostorlab.agent.mixins.agent_persist_mixin.AgentPersistMixin.add_ip_network", + return_value=True, + ) + mocker.patch( + "agent.whois_ip_agent._get_whois_record", + side_effect=ipwhois.exceptions.ASNRegistryError, + ) + + test_agent.process(scan_message_global_ipv4_with_mask32) + + assert len(agent_mock) == 0