Skip to content

Commit

Permalink
Merge pull request #61 from Ostorlab/fix/emit_ip_service
Browse files Browse the repository at this point in the history
Emit service for IPv4 and IPv6.
  • Loading branch information
3asm authored Sep 20, 2024
2 parents cd5167c + 3b6e19a commit be43857
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 9 deletions.
30 changes: 24 additions & 6 deletions agent/whatweb_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@

WHATWEB_PATH = "./whatweb"
WHATWEB_DIRECTORY = "/WhatWeb"
LIB_SELECTOR = "v3.fingerprint.domain_name.service.library"
DOMAIN_NAME_LIB_SELECTOR = "v3.fingerprint.domain_name.service.library"
IP_V4_LIB_SELECTOR = "v3.fingerprint.ip.v4.service.library"
IP_V6_LIB_SELECTOR = "v3.fingerprint.ip.v6.service.library"
SCHEME_TO_PORT = {"http": 80, "https": 443}
IPV4_CIDR_LIMIT = 16
IPV6_CIDR_LIMIT = 112
Expand Down Expand Up @@ -462,7 +464,13 @@ def _send_detected_fingerprints(
msg_data = self._get_msg_data(
target, library_name, version, fingerprint_type
)
self.emit(selector=LIB_SELECTOR, data=msg_data)
if isinstance(target, DomainTarget):
self.emit(selector=DOMAIN_NAME_LIB_SELECTOR, data=msg_data)
elif isinstance(target, IPTarget) and target.version == 4:
self.emit(selector=IP_V4_LIB_SELECTOR, data=msg_data)
elif isinstance(target, IPTarget) and target.version == 6:
self.emit(selector=IP_V6_LIB_SELECTOR, data=msg_data)

self.report_vulnerability(
entry=kb.Entry(
title=VULNZ_TITLE,
Expand All @@ -485,7 +493,13 @@ def _send_detected_fingerprints(
else:
# No version is found.
msg_data = self._get_msg_data(target, library_name, None, fingerprint_type)
self.emit(selector=LIB_SELECTOR, data=msg_data)
if isinstance(target, DomainTarget):
self.emit(selector=DOMAIN_NAME_LIB_SELECTOR, data=msg_data)
elif isinstance(target, IPTarget) and target.version == 4:
self.emit(selector=IP_V4_LIB_SELECTOR, data=msg_data)
elif isinstance(target, IPTarget) and target.version == 6:
self.emit(selector=IP_V6_LIB_SELECTOR, data=msg_data)

self.report_vulnerability(
entry=kb.Entry(
title=VULNZ_TITLE,
Expand Down Expand Up @@ -516,11 +530,15 @@ def _get_msg_data(
"""Prepare data of the library proto message to be emited."""
msg_data: Dict[str, Any] = {}
if target.name is not None:
msg_data["name"] = target.name
if isinstance(target, DomainTarget):
msg_data["name"] = target.name
msg_data["schema"] = target.schema
elif isinstance(target, IPTarget):
msg_data["host"] = target.name
msg_data["version"] = target.version
msg_data["mask"] = "32" if target.version == 4 else "128"
if target.port is not None:
msg_data["port"] = target.port
if target.schema is not None:
msg_data["schema"] = target.schema
if library_name is not None:
msg_data["library_name"] = library_name
if fingerprint_type is not None:
Expand Down
2 changes: 2 additions & 0 deletions ostorlab.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ in_selectors:
- v3.asset.ip
out_selectors:
- v3.fingerprint.domain_name.service.library
- v3.fingerprint.ip.v4.service.library
- v3.fingerprint.ip.v6.service.library
- v3.report.vulnerability
docker_file_path: Dockerfile
docker_build_root: .
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ def ip_msg() -> m.Message:
return message


@pytest.fixture
def ipv6_msg() -> m.Message:
"""Creates a dummy message of type v3.asset.ip for testing purposes."""
input_selector = "v3.asset.ip.v6"
input_data = {"host": "2a00:1450:4006:80c::2004", "version": 6}
message = m.Message.from_data(selector=input_selector, data=input_data)
return message


@pytest.fixture
def ip_msg_with_port_and_schema() -> m.Message:
"""Creates a dummy message of type v3.asset.ip.v4.port.service for testing purposes."""
Expand Down
60 changes: 57 additions & 3 deletions tests/whatweb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,61 @@ def testWhatWebAgent_withIpMsgAndAllChecksEnabled_emitsFingerprints(
for fingerprint_msg in agent_mock
)
assert any(
fingerprint_msg.data.get("schema") == "https"
fingerprint_msg.data.get("host") == "192.168.0.76"
for fingerprint_msg in agent_mock
)
assert any(
fingerprint_msg.data.get("library_name") == "lighttpd/1.4.28"
for fingerprint_msg in agent_mock
)
assert any(
fingerprint_msg.data.get("library_version") == "1.4.28"
for fingerprint_msg in agent_mock
)
assert any(
vuln_msg.data.get("title") == "Tech Stack Fingerprint"
for vuln_msg in agent_mock
)
assert any(
vuln_msg.data.get("risk_rating") == "INFO" for vuln_msg in agent_mock
)
assert any(
vuln_msg.data.get("technical_detail") == detail
for vuln_msg in agent_mock
)
assert any(
vuln_msg.data.get("security_issue") is True for vuln_msg in agent_mock
)


def testWhatWebAgent_withIpv6MsgAndAllChecksEnabled_emitsFingerprints(
agent_mock: List[message.Message],
whatweb_test_agent: whatweb_agent.AgentWhatWeb,
ipv6_msg: message.Message,
mocker: plugin.MockerFixture,
) -> None:
"""Test the whatweb agent with a given target IPv6 address. The tests mocks the call to WhatWeb binary
and validates the parsing and sending the findings to the queue.
"""
detail = (
"Found fingerprint `lighttpd`, version `1.4.28`, of type `BACKEND_COMPONENT` in target "
"`2a00:1450:4006:80c::2004`"
)

mocker.patch("subprocess.run", return_value=None)
with tempfile.TemporaryFile() as fp:
mocker.patch("tempfile.NamedTemporaryFile", return_value=fp)
with open(f"{pathlib.Path(__file__).parent}/ip_output.json", "rb") as op:
fp.write(op.read())
fp.seek(0)
whatweb_test_agent.process(ipv6_msg)
assert len(agent_mock) > 0
assert any(
fingerprint_msg.data.get("port") == 443
for fingerprint_msg in agent_mock
)
assert any(
fingerprint_msg.data.get("host") == "2a00:1450:4006:80c::2004"
for fingerprint_msg in agent_mock
)
assert any(
Expand Down Expand Up @@ -253,7 +307,7 @@ def testWhatWebAgent_whenIpMsgHasPortAndSchema_emitsFingerprints(
fingerprint_msg.data.get("port") == 80 for fingerprint_msg in agent_mock
)
assert any(
fingerprint_msg.data.get("schema") == "http"
fingerprint_msg.data.get("host") == "192.168.0.0"
for fingerprint_msg in agent_mock
)
assert any(
Expand Down Expand Up @@ -326,7 +380,7 @@ def testWhatWebAgent_whenIpMsgHasPortAndSchemaAndMask_emitsFingerprints(
fingerprint_msg.data.get("port") == 80 for fingerprint_msg in agent_mock
)
assert any(
fingerprint_msg.data.get("schema") == "http"
fingerprint_msg.data.get("host") == "192.168.0.0"
for fingerprint_msg in agent_mock
)
assert any(
Expand Down

0 comments on commit be43857

Please sign in to comment.