Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/Add-detection-for-CVE-2024-50379 #167

Merged
merged 5 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions agent/exploits/cve_2024_50379.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Agent Asteroid implementation for CVE-2024-50379"""

import re
import datetime
import logging
import re

import requests
from packaging import version

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit
from packaging import version

# Set up basic configuration for logging
logging.basicConfig(level=logging.DEBUG)

DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)

Expand All @@ -28,6 +31,58 @@
(version.parse("10.1.0"), version.parse("10.1.34")),
(version.parse("11.0.0"), version.parse("11.0.2")),
]
TEST_FILE = "detect.Jsp"
HARMLESS_CONTENT = "<!-- detection test -->"


def _concurrent_put(url: str) -> bool:
"""Make a PUT request to test race condition"""
try:
response = requests.put(

Check warning on line 41 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L40-L41

Added lines #L40 - L41 were not covered by tests
f"{url}/{TEST_FILE}",
data=HARMLESS_CONTENT,
headers={"Content-Type": "text/plain"},
timeout=DEFAULT_TIMEOUT.seconds,
)
if response.status_code < 200 or response.status_code >= 300:
logging.debug(

Check warning on line 48 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L47-L48

Added lines #L47 - L48 were not covered by tests
f"PUT request failed: {response.status_code} for {url}/{TEST_FILE}"
)
return 200 <= response.status_code < 300
except requests.RequestException as e:
logging.error(f"Error in PUT request: {e}")
return False

Check warning on line 54 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L51-L54

Added lines #L51 - L54 were not covered by tests


def _concurrent_get(url: str) -> bool:
"""Make a GET request to the standard extension"""
try:
response = requests.get(

Check warning on line 60 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L59-L60

Added lines #L59 - L60 were not covered by tests
f"{url}/{TEST_FILE.lower()}", timeout=DEFAULT_TIMEOUT.seconds
)
return response.status_code == 200
except requests.RequestException as e:
logging.error(f"Error in GET request: {e}")
return False

Check warning on line 66 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L63-L66

Added lines #L63 - L66 were not covered by tests


def _test_race_condition(url: str) -> bool:
"""
Test for race condition by making concurrent PUT and GET requests.
"""
# Make two PUT requests and five GET requests to simulate the race condition
put_results = [_concurrent_put(url) for _ in range(2)]
get_results = [

Check warning on line 75 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L74-L75

Added lines #L74 - L75 were not covered by tests
_concurrent_get(url) for _ in range(5)
] # More GET requests as per PoC

# Return True if any of the PUT requests succeeded and any of the GET requests succeed (indicating the race condition)
if any(put_results) and any(get_results):
logging.debug("Race condition successfully triggered for %s", url)
return True

Check warning on line 82 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L80-L82

Added lines #L80 - L82 were not covered by tests
else:
logging.debug("Race condition failed to trigger for %s", url)
return False

Check warning on line 85 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L84-L85

Added lines #L84 - L85 were not covered by tests


def _is_version_vulnerable(version_str: str) -> bool:
Expand All @@ -44,9 +99,11 @@
detected_version = version.parse(version_str)
for min_ver, max_ver in VULNERABLE_RANGES:
if min_ver <= detected_version <= max_ver:
logging.debug("Detected vulnerable version: %s", detected_version)
return True
return False
except version.InvalidVersion:
logging.error("Invalid version format: %s", version_str)

Check warning on line 106 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L106

Added line #L106 was not covered by tests
return False


Expand All @@ -63,32 +120,23 @@
target.origin, timeout=DEFAULT_TIMEOUT.seconds, verify=False
)
if not (200 <= root_response.status_code < 600 and root_response.text):
logging.debug(f"Failed to access root of {target.origin}")
return vulnerabilities

# Extract version information
version_match = VERSION_PATTERN.search(root_response.text)
if version_match is None:
logging.debug(f"Version not found for {target.origin}")
return vulnerabilities

tomcat_version = version_match.group(1)
if _is_version_vulnerable(tomcat_version) is False:
return vulnerabilities

# Test for the PUT capability that enables the race condition
test_response = requests.put(
f"{target.origin}/test.Jsp",
data="<!-- test -->",
timeout=DEFAULT_TIMEOUT.seconds,
verify=False,
)
if _test_race_condition(target.origin) is True:
vulnerabilities.append(self._create_vulnerability(target))

Check warning on line 132 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L132

Added line #L132 was not covered by tests

if (
test_response.status_code in (201, 204)
or _is_version_vulnerable(tomcat_version) is True
):
vulnerabilities = [self._create_vulnerability(target)]
tomcat_version = version_match.group(1)
if _is_version_vulnerable(tomcat_version) is True:
vulnerabilities.append(self._create_vulnerability(target))

except requests.RequestException:
except requests.RequestException as e:
logging.error(f"Error in checking {target.origin}: {e}")
return vulnerabilities

return vulnerabilities
Expand Down
72 changes: 39 additions & 33 deletions tests/exploits/cve_2024_50379_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Unit tests for Agent Asteroid: CVE-2024-50379"""

from unittest import mock

import requests
import requests_mock as req_mock

from agent import definitions
from agent.exploits import cve_2024_50379

Expand All @@ -19,28 +22,24 @@ def testCVE202450379_whenVulnerable_reportFinding(
status_code=200,
)

# Mock the PUT request test
requests_mock.put(
"http://localhost:80/test.Jsp",
status_code=201,
)
with mock.patch(
"agent.exploits.cve_2024_50379._test_race_condition",
new_callable=mock.AsyncMock,
) as mock_test:
mock_test.return_value = True

target = definitions.Target("http", "localhost", 80)
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)
accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) > 0
vulnerability = vulnerabilities[0]
assert (
vulnerability.entry.title
== "Apache Tomcat Race Condition Remote Code Execution"
)
assert (
vulnerability.technical_detail
== "http://localhost:80 is vulnerable to CVE-2024-50379, Apache Tomcat Race Condition Remote Code Execution"
)
assert accept is True
assert len(vulnerabilities) > 0
vulnerability = vulnerabilities[0]
assert (
vulnerability.entry.title
== "Apache Tomcat Race Condition Remote Code Execution"
)


def testCVE202450379_whenSafeVersion_reportNothing(
Expand All @@ -55,13 +54,19 @@ def testCVE202450379_whenSafeVersion_reportNothing(
status_code=200,
)

target = definitions.Target("http", "localhost", 80)
with mock.patch(
"agent.exploits.cve_2024_50379._test_race_condition",
new_callable=mock.AsyncMock,
) as mock_test:
mock_test.return_value = False

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)
target = definitions.Target("http", "localhost", 80)

assert accept is True
assert len(vulnerabilities) == 0
accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) == 0


def testCVE202450379_whenNoTomcat_reportNothing(
Expand All @@ -82,27 +87,28 @@ def testCVE202450379_whenNoTomcat_reportNothing(
assert accept is False


def testCVE202450379_whenPutNotAllowed_reportNothing(
def testCVE202450379_whenRaceConditionFails_reportNothing(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when PUT requests are not allowed."""
"""CVE-2024-50379 unit test: case when race condition test fails."""
exploit_instance = cve_2024_50379.CVE202450379Exploit()

requests_mock.get(
"http://localhost:80/",
status_code=200,
)

requests_mock.put(
"http://localhost:80/test.Jsp",
status_code=403,
)
with mock.patch(
"agent.exploits.cve_2024_50379._test_race_condition",
new_callable=mock.AsyncMock,
) as mock_test:
mock_test.return_value = False

target = definitions.Target("http", "localhost", 80)
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)
vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0
assert len(vulnerabilities) == 0


def testCVE202450379_whenConnectionError_reportNothing(
Expand Down
Loading