From 0b7244201092d1c7ad0553c4dc0d82fc49df9c4f Mon Sep 17 00:00:00 2001 From: Alaeddine Mesbahi Date: Fri, 20 Sep 2024 19:04:01 +0100 Subject: [PATCH 1/2] Emit service for IPv4 and IPv6. --- agent/whatweb_agent.py | 30 ++++++++++++++++++++++++------ ostorlab.yaml | 2 ++ tests/whatweb_test.py | 6 +++--- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/agent/whatweb_agent.py b/agent/whatweb_agent.py index 6caab19..25e39c8 100644 --- a/agent/whatweb_agent.py +++ b/agent/whatweb_agent.py @@ -76,7 +76,9 @@ WHATWEB_PATH = "./whatweb" WHATWEB_DIRECTORY = "/WhatWeb" -LIB_SELECTOR = "v3.fingerprint.domain_name.service.library" +DOMAIN_NAME_LIB_SELECTOR = "v3.fingerprint.domain_name.service.library" +IP_V4_LIB_SELECTOR = "v3.fingerprint.ip.v4.service.library" +IP_V6_LIB_SELECTOR = "v3.fingerprint.ip.v4.service.library" SCHEME_TO_PORT = {"http": 80, "https": 443} IPV4_CIDR_LIMIT = 16 IPV6_CIDR_LIMIT = 112 @@ -462,7 +464,13 @@ def _send_detected_fingerprints( msg_data = self._get_msg_data( target, library_name, version, fingerprint_type ) - self.emit(selector=LIB_SELECTOR, data=msg_data) + if isinstance(target, DomainTarget): + self.emit(selector=DOMAIN_NAME_LIB_SELECTOR, data=msg_data) + elif isinstance(target, IPTarget) and target.version == 4: + self.emit(selector=IP_V4_LIB_SELECTOR, data=msg_data) + elif isinstance(target, IPTarget) and target.version == 6: + self.emit(selector=IP_V6_LIB_SELECTOR, data=msg_data) + self.report_vulnerability( entry=kb.Entry( title=VULNZ_TITLE, @@ -485,7 +493,13 @@ def _send_detected_fingerprints( else: # No version is found. msg_data = self._get_msg_data(target, library_name, None, fingerprint_type) - self.emit(selector=LIB_SELECTOR, data=msg_data) + if isinstance(target, DomainTarget): + self.emit(selector=DOMAIN_NAME_LIB_SELECTOR, data=msg_data) + elif isinstance(target, IPTarget) and target.version == 4: + self.emit(selector=IP_V4_LIB_SELECTOR, data=msg_data) + elif isinstance(target, IPTarget) and target.version == 6: + self.emit(selector=IP_V6_LIB_SELECTOR, data=msg_data) + self.report_vulnerability( entry=kb.Entry( title=VULNZ_TITLE, @@ -516,11 +530,15 @@ def _get_msg_data( """Prepare data of the library proto message to be emited.""" msg_data: Dict[str, Any] = {} if target.name is not None: - msg_data["name"] = target.name + if isinstance(target, DomainTarget): + msg_data["name"] = target.name + msg_data["schema"] = target.schema + elif isinstance(target, IPTarget): + msg_data["host"] = target.name + msg_data["version"] = target.version + msg_data["mask"] = "32" if target.version == 4 else "128" if target.port is not None: msg_data["port"] = target.port - if target.schema is not None: - msg_data["schema"] = target.schema if library_name is not None: msg_data["library_name"] = library_name if fingerprint_type is not None: diff --git a/ostorlab.yaml b/ostorlab.yaml index 7fc9168..7bdfa70 100644 --- a/ostorlab.yaml +++ b/ostorlab.yaml @@ -75,6 +75,8 @@ in_selectors: - v3.asset.ip out_selectors: - v3.fingerprint.domain_name.service.library + - v3.fingerprint.ip.v4.service.library + - v3.fingerprint.ip.v6.service.library - v3.report.vulnerability docker_file_path: Dockerfile docker_build_root: . diff --git a/tests/whatweb_test.py b/tests/whatweb_test.py index 9d87948..aafe9f0 100644 --- a/tests/whatweb_test.py +++ b/tests/whatweb_test.py @@ -200,7 +200,7 @@ def testWhatWebAgent_withIpMsgAndAllChecksEnabled_emitsFingerprints( for fingerprint_msg in agent_mock ) assert any( - fingerprint_msg.data.get("schema") == "https" + fingerprint_msg.data.get("host") == "192.168.0.76" for fingerprint_msg in agent_mock ) assert any( @@ -253,7 +253,7 @@ def testWhatWebAgent_whenIpMsgHasPortAndSchema_emitsFingerprints( fingerprint_msg.data.get("port") == 80 for fingerprint_msg in agent_mock ) assert any( - fingerprint_msg.data.get("schema") == "http" + fingerprint_msg.data.get("host") == "192.168.0.0" for fingerprint_msg in agent_mock ) assert any( @@ -326,7 +326,7 @@ def testWhatWebAgent_whenIpMsgHasPortAndSchemaAndMask_emitsFingerprints( fingerprint_msg.data.get("port") == 80 for fingerprint_msg in agent_mock ) assert any( - fingerprint_msg.data.get("schema") == "http" + fingerprint_msg.data.get("host") == "192.168.0.0" for fingerprint_msg in agent_mock ) assert any( From 3b6e19a26bc79184c85f0aec9baf7fb34ba023c5 Mon Sep 17 00:00:00 2001 From: Alaeddine Mesbahi Date: Fri, 20 Sep 2024 19:27:30 +0100 Subject: [PATCH 2/2] Emit service for IPv4 and IPv6. --- agent/whatweb_agent.py | 2 +- tests/conftest.py | 9 +++++++ tests/whatweb_test.py | 54 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/agent/whatweb_agent.py b/agent/whatweb_agent.py index 25e39c8..a53cb14 100644 --- a/agent/whatweb_agent.py +++ b/agent/whatweb_agent.py @@ -78,7 +78,7 @@ WHATWEB_DIRECTORY = "/WhatWeb" DOMAIN_NAME_LIB_SELECTOR = "v3.fingerprint.domain_name.service.library" IP_V4_LIB_SELECTOR = "v3.fingerprint.ip.v4.service.library" -IP_V6_LIB_SELECTOR = "v3.fingerprint.ip.v4.service.library" +IP_V6_LIB_SELECTOR = "v3.fingerprint.ip.v6.service.library" SCHEME_TO_PORT = {"http": 80, "https": 443} IPV4_CIDR_LIMIT = 16 IPV6_CIDR_LIMIT = 112 diff --git a/tests/conftest.py b/tests/conftest.py index d30d920..f846bbe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,6 +50,15 @@ def ip_msg() -> m.Message: return message +@pytest.fixture +def ipv6_msg() -> m.Message: + """Creates a dummy message of type v3.asset.ip for testing purposes.""" + input_selector = "v3.asset.ip.v6" + input_data = {"host": "2a00:1450:4006:80c::2004", "version": 6} + message = m.Message.from_data(selector=input_selector, data=input_data) + return message + + @pytest.fixture def ip_msg_with_port_and_schema() -> m.Message: """Creates a dummy message of type v3.asset.ip.v4.port.service for testing purposes.""" diff --git a/tests/whatweb_test.py b/tests/whatweb_test.py index aafe9f0..179139e 100644 --- a/tests/whatweb_test.py +++ b/tests/whatweb_test.py @@ -227,6 +227,60 @@ def testWhatWebAgent_withIpMsgAndAllChecksEnabled_emitsFingerprints( ) +def testWhatWebAgent_withIpv6MsgAndAllChecksEnabled_emitsFingerprints( + agent_mock: List[message.Message], + whatweb_test_agent: whatweb_agent.AgentWhatWeb, + ipv6_msg: message.Message, + mocker: plugin.MockerFixture, +) -> None: + """Test the whatweb agent with a given target IPv6 address. The tests mocks the call to WhatWeb binary + and validates the parsing and sending the findings to the queue. + """ + detail = ( + "Found fingerprint `lighttpd`, version `1.4.28`, of type `BACKEND_COMPONENT` in target " + "`2a00:1450:4006:80c::2004`" + ) + + mocker.patch("subprocess.run", return_value=None) + with tempfile.TemporaryFile() as fp: + mocker.patch("tempfile.NamedTemporaryFile", return_value=fp) + with open(f"{pathlib.Path(__file__).parent}/ip_output.json", "rb") as op: + fp.write(op.read()) + fp.seek(0) + whatweb_test_agent.process(ipv6_msg) + assert len(agent_mock) > 0 + assert any( + fingerprint_msg.data.get("port") == 443 + for fingerprint_msg in agent_mock + ) + assert any( + fingerprint_msg.data.get("host") == "2a00:1450:4006:80c::2004" + for fingerprint_msg in agent_mock + ) + assert any( + fingerprint_msg.data.get("library_name") == "lighttpd/1.4.28" + for fingerprint_msg in agent_mock + ) + assert any( + fingerprint_msg.data.get("library_version") == "1.4.28" + for fingerprint_msg in agent_mock + ) + assert any( + vuln_msg.data.get("title") == "Tech Stack Fingerprint" + for vuln_msg in agent_mock + ) + assert any( + vuln_msg.data.get("risk_rating") == "INFO" for vuln_msg in agent_mock + ) + assert any( + vuln_msg.data.get("technical_detail") == detail + for vuln_msg in agent_mock + ) + assert any( + vuln_msg.data.get("security_issue") is True for vuln_msg in agent_mock + ) + + def testWhatWebAgent_whenIpMsgHasPortAndSchema_emitsFingerprints( agent_mock: List[message.Message], whatweb_test_agent: whatweb_agent.AgentWhatWeb,