Skip to content

Commit

Permalink
Merge pull request #37 from Ostorlab/feat_update_cidr_limit
Browse files Browse the repository at this point in the history
Feat : Add CIDR Limit
  • Loading branch information
benyissa authored Dec 19, 2023
2 parents fb6b769 + 84cad05 commit 13e0f48
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 32 deletions.
56 changes: 26 additions & 30 deletions agent/whatweb_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
WHATWEB_DIRECTORY = "/WhatWeb"
LIB_SELECTOR = "v3.fingerprint.domain_name.service.library"
SCHEME_TO_PORT = {"http": 80, "https": 443}
IPV4_CIDR_LIMIT = 16
IPV6_CIDR_LIMIT = 112


class BaseTarget(abc.ABC):
Expand Down Expand Up @@ -218,39 +220,33 @@ def _prepare_ip_targets(self, message: msg.Message) -> List[IPTarget]:
targets: List[IPTarget] = []
host = message.data.get("host")
mask = message.data.get("mask")

if host is None:
return targets

if mask is not None:
try:
addresses = ipaddress.ip_network(f"{host}/{mask}", strict=False)
for address in addresses.hosts():
targets.append(
IPTarget(
name=str(address),
version=address.version,
schema=self._get_schema(message),
port=self._get_port(message),
)
)
except ValueError as e:
logger.error("Invalid IP or mask. %s", e)
if mask is None:
network = ipaddress.ip_network(f"{host}")
else:
try:
addresses = ipaddress.ip_network(host, strict=False)
for address in addresses.hosts():
targets.append(
IPTarget(
name=str(address),
version=address.version,
schema=self._get_schema(message),
port=self._get_port(message),
)
)
except ValueError as e:
logger.error("Invalid IP. %s", e)

version = message.data.get("version")
if version not in (4, 6):
raise ValueError(f"Incorrect ip version {version}.")
elif version == 4 and int(mask) < IPV4_CIDR_LIMIT:
raise ValueError(
f"Subnet mask below {IPV4_CIDR_LIMIT} is not supported."
)
elif version == 6 and int(mask) < IPV6_CIDR_LIMIT:
raise ValueError(
f"Subnet mask below {IPV6_CIDR_LIMIT} is not supported."
)
network = ipaddress.ip_network(f"{host}/{mask}", strict=False)

for address in network.hosts():
targets.append(
IPTarget(
name=str(address),
version=address.version,
schema=self._get_schema(message),
port=self._get_port(message),
)
)
return targets

def _is_domain_in_scope(
Expand Down
2 changes: 1 addition & 1 deletion ostorlab.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
kind: Agent
name: whatweb
version: 0.1.15
version: 0.1.16
image: images/logo.png
description: |
This repository is an implementation of [Ostorlab Agent](https://pypi.org/project/ostorlab/) for the [WhatWeb Fingerprinter](https://github.com/urbanadventurer/WhatWeb.git).
Expand Down
77 changes: 76 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import pathlib
from typing import Dict, Union
import random

from ostorlab.agent import definitions as agent_definitions
from ostorlab.runtimes import definitions as runtime_definitions
Expand Down Expand Up @@ -61,7 +62,13 @@ def ip_msg_with_port_and_schema() -> m.Message:
def ip_msg_with_port_schema_mask() -> m.Message:
"""Creates a dummy message of type v3.asset.ip.v4.port.service for testing purposes."""
input_selector = "v3.asset.ip.v4.port.service"
input_data = {"host": "192.168.0.0", "port": 80, "mask": "32", "protocol": "http"}
input_data = {
"host": "192.168.0.0",
"port": 80,
"mask": "32",
"protocol": "http",
"version": 4,
}
message = m.Message.from_data(selector=input_selector, data=input_data)
return message

Expand Down Expand Up @@ -124,3 +131,71 @@ def whatweb_agent_with_scope_arg(
],
)
return whatweb_agent.AgentWhatWeb(agent_definition, agent_settings)


@pytest.fixture()
def test_agent() -> whatweb_agent.AgentWhatWeb:
"""WhatWeb Agent fixture for testing purposes."""
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
agent_definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
agent_settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/whatweb",
bus_url="NA",
bus_exchange_topic="NA",
redis_url="redis://redis",
args=[],
healthcheck_port=random.randint(4000, 5000),
)
return whatweb_agent.AgentWhatWeb(agent_definition, agent_settings)


@pytest.fixture()
def scan_message_ipv4_with_mask8() -> m.Message:
"""Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v4"
msg_data = {"host": "192.168.1.17", "mask": "8", "version": 4}
return m.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv4_with_mask16() -> m.Message:
"""Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v4"
msg_data = {"host": "192.168.1.17", "mask": "16", "version": 4}
return m.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv6_with_mask64() -> m.Message:
"""Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v6"
msg_data = {
"host": "2001:db8:3333:4444:5555:6666:7777:8888",
"mask": "64",
"version": 6,
}
return m.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv6_with_mask112() -> m.Message:
"""Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v6"
msg_data = {
"host": "2001:db8:3333:4444:5555:6666:7777:8888",
"mask": "112",
"version": 6,
}
return m.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv_with_incorrect_version() -> m.Message:
"""Creates a message of type v3.asset.ip with an incorrect version."""
selector = "v3.asset.ip"
msg_data = {
"host": "0.0.0.0",
"mask": "32",
"version": 5,
}
return m.Message.from_data(selector, data=msg_data)
47 changes: 47 additions & 0 deletions tests/whatweb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ostorlab.agent.message import message
from pytest_mock import plugin
import pytest

from agent import whatweb_agent

Expand Down Expand Up @@ -521,3 +522,49 @@ def testWhatWebAgent_withUnsupportedSchema_targetShouldNotBeScanned(
# assert
assert len(agent_mock) == 0
assert subprocess_mock.call_count == 0


def testWhatWebAgent_whenIPv4AssetDoesNotReachCIDRLimit_doesNotRaiseValueError(
test_agent: whatweb_agent.AgentWhatWeb,
mocker: plugin.MockerFixture,
scan_message_ipv4_with_mask16: message.Message,
) -> None:
"""Test the CIDR Limit in case IPV4 and the Limit is not reached."""
mocker.patch(
"ostorlab.agent.mixins.agent_persist_mixin.AgentPersistMixin.add_ip_network",
return_value=False,
)

test_agent.process(scan_message_ipv4_with_mask16)


def testWhatWebAgent_whenIPv6AssetReachCIDRLimit_raiseValueError(
test_agent: whatweb_agent.AgentWhatWeb,
scan_message_ipv6_with_mask64: message.Message,
) -> None:
"""Test the CIDR Limit in case IPV6 and the Limit is reached."""
with pytest.raises(ValueError, match="Subnet mask below 112 is not supported."):
test_agent.process(scan_message_ipv6_with_mask64)


def testWhatWebAgent_whenIPv6AssetDoesNotReachCIDRLimit_doesNotRaiseValueError(
test_agent: whatweb_agent.AgentWhatWeb,
mocker: plugin.MockerFixture,
scan_message_ipv6_with_mask112: message.Message,
) -> None:
"""Test the CIDR Limit in case IPV6 and the Limit is not reached."""
mocker.patch(
"ostorlab.agent.mixins.agent_persist_mixin.AgentPersistMixin.add_ip_network",
return_value=False,
)

test_agent.process(scan_message_ipv6_with_mask112)


def testWhatWebAgent_whenIPAssetHasIncorrectVersion_raiseValueError(
test_agent: whatweb_agent.AgentWhatWeb,
scan_message_ipv_with_incorrect_version: message.Message,
) -> None:
"""Test the CIDR Limit in case IP has incorrect version."""
with pytest.raises(ValueError, match="Incorrect ip version 5."):
test_agent.process(scan_message_ipv_with_incorrect_version)

0 comments on commit 13e0f48

Please sign in to comment.