From e8072d632e9e70921ad60eaaa7748defd542fdec Mon Sep 17 00:00:00 2001 From: Mohamed Benchikh Date: Tue, 27 Feb 2024 10:23:50 +0100 Subject: [PATCH 1/3] Handle the case when IP has no ASN --- agent/whois_ip_agent.py | 1 + tests/conftest.py | 12 ++++++++++++ tests/whois_ip_agent_test.py | 24 +++++++++++++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/agent/whois_ip_agent.py b/agent/whois_ip_agent.py index 11bf0b7..0f498cd 100644 --- a/agent/whois_ip_agent.py +++ b/agent/whois_ip_agent.py @@ -105,6 +105,7 @@ def _process_dns_record(self, message: m.Message) -> None: except ( ipwhois.exceptions.IPDefinedError, ipwhois.exceptions.HTTPLookupError, + ipwhois.exceptions.ASNRegistryError, ): # Case where of the loopback address. logger.warning( diff --git a/tests/conftest.py b/tests/conftest.py index d4147e3..113e37a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -161,3 +161,15 @@ def scan_message_ipv_with_incorrect_version() -> message.Message: "version": 5, } return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_global_ipv4_with_mask32() -> message.Message: + """Creates a message of type v3.asset.ip with global IP address""" + selector = "v3.asset.ip" + msg_data = { + "host": "41.0.0.0", + "mask": "32", + "version": 4, + } + return message.Message.from_data(selector, data=msg_data) diff --git a/tests/whois_ip_agent_test.py b/tests/whois_ip_agent_test.py index 6265fa5..92e459e 100644 --- a/tests/whois_ip_agent_test.py +++ b/tests/whois_ip_agent_test.py @@ -1,9 +1,10 @@ """Unittests for WhoisIP agent.""" from typing import List, Dict +import ipwhois +import pytest from ostorlab.agent.message import message from pytest_mock import plugin -import pytest from agent import whois_ip_agent @@ -281,3 +282,24 @@ def testWhoisIP_whenIPAssetHasIncorrectVersion_raiseValueError( """Test the CIDR Limit in case IP has incorrect version.""" with pytest.raises(ValueError, match="Incorrect ip version 5."): test_agent.process(scan_message_ipv_with_incorrect_version) + + +def testWhoisIP_whenIPHasNotASN_doesNotCrash( + test_agent: whois_ip_agent.WhoisIPAgent, + agent_mock: List[message.Message], + mocker: plugin.MockerFixture, + scan_message_global_ipv4_with_mask16: message.Message, +) -> None: + """Test the CIDR Limit in case IP has no ASN.""" + mocker.patch( + "ostorlab.agent.mixins.agent_persist_mixin.AgentPersistMixin.add_ip_network", + return_value=True, + ) + mocker.patch( + "agent.whois_ip_agent._get_whois_record", + side_effect=ipwhois.exceptions.ASNRegistryError, + ) + + test_agent.process(scan_message_global_ipv4_with_mask16) + + assert len(agent_mock) == 0 From 1abee33abba0caa98983d977b283f0df6de7542d Mon Sep 17 00:00:00 2001 From: Mohamed Benchikh Date: Tue, 27 Feb 2024 10:24:42 +0100 Subject: [PATCH 2/3] Correct typo --- tests/whois_ip_agent_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/whois_ip_agent_test.py b/tests/whois_ip_agent_test.py index 92e459e..f970722 100644 --- a/tests/whois_ip_agent_test.py +++ b/tests/whois_ip_agent_test.py @@ -284,7 +284,7 @@ def testWhoisIP_whenIPAssetHasIncorrectVersion_raiseValueError( test_agent.process(scan_message_ipv_with_incorrect_version) -def testWhoisIP_whenIPHasNotASN_doesNotCrash( +def testWhoisIP_whenIPHasNoASN_doesNotCrash( test_agent: whois_ip_agent.WhoisIPAgent, agent_mock: List[message.Message], mocker: plugin.MockerFixture, From 00bafc1b5d97ad128e2a73d21441fb22a9edff60 Mon Sep 17 00:00:00 2001 From: Mohamed Benchikh Date: Tue, 27 Feb 2024 10:45:45 +0100 Subject: [PATCH 3/3] Correct fixture name --- tests/whois_ip_agent_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/whois_ip_agent_test.py b/tests/whois_ip_agent_test.py index f970722..299f0b5 100644 --- a/tests/whois_ip_agent_test.py +++ b/tests/whois_ip_agent_test.py @@ -288,7 +288,7 @@ def testWhoisIP_whenIPHasNoASN_doesNotCrash( test_agent: whois_ip_agent.WhoisIPAgent, agent_mock: List[message.Message], mocker: plugin.MockerFixture, - scan_message_global_ipv4_with_mask16: message.Message, + scan_message_global_ipv4_with_mask32: message.Message, ) -> None: """Test the CIDR Limit in case IP has no ASN.""" mocker.patch( @@ -300,6 +300,6 @@ def testWhoisIP_whenIPHasNoASN_doesNotCrash( side_effect=ipwhois.exceptions.ASNRegistryError, ) - test_agent.process(scan_message_global_ipv4_with_mask16) + test_agent.process(scan_message_global_ipv4_with_mask32) assert len(agent_mock) == 0