diff --git a/WHATSNEW.md b/WHATSNEW.md index c1101b1..757a99d 100644 --- a/WHATSNEW.md +++ b/WHATSNEW.md @@ -1,8 +1,10 @@ ### Added - Option `--raw` for `fmg get devices` +- Support for HTTP `PATCH` and `DELETE` method in FortiClientEMS - Option `--smtp` for `fgt config check` + ### Changed - `fmg get devices` also shows ha nodes if device is a cluster diff --git a/fotoobo/fortinet/forticlientems.py b/fotoobo/fortinet/forticlientems.py index d047146..e8cefe9 100644 --- a/fotoobo/fortinet/forticlientems.py +++ b/fotoobo/fortinet/forticlientems.py @@ -1,8 +1,10 @@ """ FortiClient EMS Class """ + import logging import pickle +import re from pathlib import Path from typing import Any, Dict, Optional @@ -20,6 +22,8 @@ class FortiClientEMS(Fortinet): Represents one FortiClient EMS (digital twin) """ + ALLOWED_HTTP_METHODS = ["DELETE", "GET", "PATCH", "POST"] + def __init__( self, hostname: str, @@ -41,9 +45,9 @@ def __init__( super().__init__(hostname, **kwargs) self.api_url = f"https://{self.hostname}:{self.https_port}/api/v1" self.cookie_path = cookie_path - self.password = password - self.username = username - self.type = "forticlientems" + self.password: str = password + self.username: str = username + self.type: str = "forticlientems" def api( # pylint: disable=too-many-arguments self, @@ -68,6 +72,9 @@ def api( # pylint: disable=too-many-arguments Returns: Response from the request """ + if not headers: + headers = self.session.headers # type: ignore + return super().api( method, url, payload=payload, params=params, timeout=timeout, headers=headers ) @@ -109,15 +116,19 @@ def login(self) -> int: """ status = 401 cookie = Path(self.cookie_path).expanduser() / f"{self.hostname}.cookie" + csrf = Path(self.cookie_path).expanduser() / f"{self.hostname}.csrf" if self.cookie_path: - log.debug("Searching cookie in '%s'", cookie) + log.debug("Searching cookie and csrf token in '%s'", cookie.parents[0]) - if cookie.is_file(): - log.debug("Cookie exists. Skipping login") + if cookie.is_file() and csrf.is_file(): + log.debug("Cookie and csrf token both exist") with cookie.open("rb") as cookie_file: self.session.cookies.update(pickle.load(cookie_file)) # type: ignore + self.session.headers["Referer"] = f"https://{self.hostname}" + self.session.headers["X-CSRFToken"] = csrf.read_text() + try: response = self.api("get", "/system/serial_number") if ( @@ -135,22 +146,36 @@ def login(self) -> int: status = err.code else: - log.debug("No cookie found for '%s'", self.hostname) + log.debug("No cookie or csrf token found for '%s'", self.hostname) if status == 401: log.debug("Login to '%s'", self.hostname) payload = {"name": self.username, "password": self.password} response = self.api("post", "/auth/signin", payload=payload) - if response.status_code == 200 and self.cookie_path: - log.debug("Saving cookie for '%s'", self.hostname) - - try: - with cookie.open("wb") as cookie_file: - pickle.dump(self.session.cookies, cookie_file) - - except FileNotFoundError: - log.warning("Unable to save cookie file '%s'", str(cookie.resolve())) + if response.status_code == 200: + self.session.headers["Referer"] = f"https://{self.hostname}" + if match := re.match(r"csrftoken=(\S+);", response.headers["Set-Cookie"]): + csrf_token = match.group(1) + self.session.headers["X-CSRFToken"] = csrf_token + + if self.cookie_path: + log.debug("Saving cookie for '%s'", self.hostname) + try: + with cookie.open("wb") as cookie_file: + pickle.dump(self.session.cookies, cookie_file) + + except FileNotFoundError as exc: + log.debug(exc) + log.warning("Unable to save cookie file '%s'", str(cookie.resolve())) + + log.debug("Saving csrf token for '%s'", self.hostname) + try: + csrf.write_text(csrf_token) + + except NameError as exc: + log.debug(exc) + log.warning("Unable to save csrf token file '%s'", str(csrf.resolve())) status = response.status_code diff --git a/fotoobo/fortinet/fortinet.py b/fotoobo/fortinet/fortinet.py index e2c0c46..2082f58 100644 --- a/fotoobo/fortinet/fortinet.py +++ b/fotoobo/fortinet/fortinet.py @@ -23,6 +23,9 @@ class Fortinet(ABC): defined here with the abstractmethod decorator. """ + # Use the ALLOWED_HTTP_METHODS class constant to define the supported HTTP methods. By default + # we should support GET and POST but you may override this list of supported methods in every + # subclass. Treat this setting as a constant which must not be redefined during runtime. ALLOWED_HTTP_METHODS = ["GET", "POST"] def __init__(self, hostname: str, **kwargs: Any) -> None: @@ -94,7 +97,7 @@ def api( # pylint: disable=too-many-arguments timeout = timeout or self.timeout start = time() - if method.upper() not in Fortinet.ALLOWED_HTTP_METHODS: + if method.upper() not in self.ALLOWED_HTTP_METHODS: error = f"HTTP method '{method.upper()}' is not implemented" log.error(error) raise NotImplementedError(error) diff --git a/tests/data/ems_dummy.csrf b/tests/data/ems_dummy.csrf new file mode 100644 index 0000000..6b24a30 --- /dev/null +++ b/tests/data/ems_dummy.csrf @@ -0,0 +1 @@ +dummy_csrf_token_from_cache diff --git a/tests/data/host.cookie b/tests/data/host.cookie deleted file mode 100644 index b8408d9..0000000 Binary files a/tests/data/host.cookie and /dev/null differ diff --git a/tests/fortinet/test_forticlientems.py b/tests/fortinet/test_forticlientems.py index e0ec1a1..8111bef 100644 --- a/tests/fortinet/test_forticlientems.py +++ b/tests/fortinet/test_forticlientems.py @@ -2,7 +2,8 @@ """ Test the FortiClient EMS class """ -from unittest.mock import MagicMock +from pathlib import Path +from unittest.mock import ANY, MagicMock import pytest import requests @@ -23,13 +24,17 @@ def test_login_without_cookie(monkeypatch: MonkeyPatch) -> None: "fotoobo.fortinet.forticlientems.requests.Session.post", MagicMock( return_value=ResponseMock( - json={"result": {"retval": 1, "message": "Login successful."}}, status=200 + headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"}, + json={"result": {"retval": 1, "message": "Login successful."}}, + status=200, ) ), ) - ems = FortiClientEMS("host", "dummy_user", "dummy_pass", ssl_verify=False) - assert ems.api_url == "https://host:443/api/v1" + ems = FortiClientEMS("ems_dummy", "dummy_user", "dummy_pass", ssl_verify=False) + assert ems.api_url == "https://ems_dummy:443/api/v1" assert ems.login() == 200 + assert ems.session.headers["Referer"] == "https://ems_dummy" + assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token" @staticmethod def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None: @@ -42,12 +47,16 @@ def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None: ) ), ) - ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False) - assert ems.api_url == "https://host:443/api/v1" + ems = FortiClientEMS( + "ems_dummy", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False + ) + assert ems.api_url == "https://ems_dummy:443/api/v1" assert ems.login() == 200 + assert ems.session.headers["Referer"] == "https://ems_dummy" + assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token_from_cache\n" @staticmethod - def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None: + def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch, temp_dir: Path) -> None: """Test the login to a FortiClient EMS with no session cookie given""" monkeypatch.setattr( "fotoobo.fortinet.forticlientems.requests.Session.get", @@ -67,6 +76,7 @@ def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None: "fotoobo.fortinet.forticlientems.requests.Session.post", MagicMock( return_value=ResponseMock( + headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"}, json={ "result": {"retval": 1, "message": "Login successful."}, }, @@ -74,17 +84,24 @@ def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None: ), ), ) - ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False) + source = Path("tests/data/ems_dummy.cookie") + destination = Path(temp_dir / "ems_dummy.cookie") + destination.write_bytes(source.read_bytes()) + source = Path("tests/data/ems_dummy.csrf") + destination = Path(temp_dir / "ems_dummy.csrf") + destination.write_bytes(source.read_bytes()) + ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False) assert ems.api_url == "https://host:443/api/v1" assert ems.login() == 200 @staticmethod - def test_login_with_invalid_cookie_path(temp_dir: str, monkeypatch: MonkeyPatch) -> None: + def test_login_with_invalid_cookie_path(temp_dir: Path, monkeypatch: MonkeyPatch) -> None: """Test the login to a FortiClient EMS with an invalid cookie path""" monkeypatch.setattr( "fotoobo.fortinet.fortinet.requests.Session.post", MagicMock( return_value=ResponseMock( + headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"}, json={ "result": {"retval": 1, "message": "Login successful."}, }, @@ -92,8 +109,27 @@ def test_login_with_invalid_cookie_path(temp_dir: str, monkeypatch: MonkeyPatch) ) ), ) - ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False) - assert ems.api_url == "https://host:443/api/v1" + ems = FortiClientEMS("host_1", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False) + assert ems.api_url == "https://host_1:443/api/v1" + assert ems.login() == 200 + + @staticmethod + def test_login_with_csrf_token_not_found(temp_dir: Path, monkeypatch: MonkeyPatch) -> None: + """Test the login to a FortiClient EMS when the csrf token was not found in the headers""" + monkeypatch.setattr( + "fotoobo.fortinet.fortinet.requests.Session.post", + MagicMock( + return_value=ResponseMock( + headers={"Set-Cookie": "csrftoken_missing=dummy_csrf_token;"}, + json={ + "result": {"retval": 1, "message": "Login successful."}, + }, + status=200, + ) + ), + ) + ems = FortiClientEMS("host_2", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False) + assert ems.api_url == "https://host_2:443/api/v1" assert ems.login() == 200 @staticmethod @@ -143,7 +179,7 @@ def test_get_version_ok(monkeypatch: MonkeyPatch) -> None: response = ems.get_version() requests.Session.get.assert_called_with( "https://host:443/api/v1/system/consts/get?system_update_time=1", - headers=None, + headers=ANY, json=None, params=None, timeout=3, @@ -167,7 +203,7 @@ def test_get_version_invalid(monkeypatch: MonkeyPatch) -> None: assert "Did not find any FortiClient EMS version number in response" in str(err.value) requests.Session.get.assert_called_with( "https://host:443/api/v1/system/consts/get?system_update_time=1", - headers=None, + headers=ANY, json=None, params=None, timeout=3, diff --git a/tests/helper.py b/tests/helper.py index 558785a..5d7e1da 100644 --- a/tests/helper.py +++ b/tests/helper.py @@ -21,6 +21,7 @@ def __init__(self, **kwargs: Any) -> None: status (int, optional): http status code. Defaults to 444 (No Response) """ self.text = kwargs.get("text", "") + self.headers = kwargs.get("headers", []) self.json = MagicMock(return_value=kwargs.get("json", None)) self.status_code = kwargs.get("status", 444) self.ok = kwargs.get("ok", True)