From 3bd5ab22a3ace43e8268e1e4f2f1f69f4e19e1f4 Mon Sep 17 00:00:00 2001
From: Patrik Spiess <patrik.spiess@mgb.ch>
Date: Fri, 23 Feb 2024 10:32:47 +0100
Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=91=94=20Add=20HTTP=20methods=20`PATC?=
 =?UTF-8?q?H`=20and=20`DELETE`?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 WHATSNEW.md                           |   1 +
 fotoobo/fortinet/forticlientems.py    |  57 ++++++++++++++++-------
 tests/data/ems_dummy.csrf             |   1 +
 tests/data/host.cookie                | Bin 419 -> 0 bytes
 tests/fortinet/test_forticlientems.py |  62 ++++++++++++++++++++------
 tests/helper.py                       |   1 +
 6 files changed, 93 insertions(+), 29 deletions(-)
 create mode 100644 tests/data/ems_dummy.csrf
 delete mode 100644 tests/data/host.cookie

diff --git a/WHATSNEW.md b/WHATSNEW.md
index 5fe1a5c..e438bad 100644
--- a/WHATSNEW.md
+++ b/WHATSNEW.md
@@ -1,6 +1,7 @@
 ### Added
 
 - Option `--raw` for `fmg get devices`
+- Support for HTTP `PATCH` and `DELETE` method in FortiClientEMS
 
 ### Changed
 
diff --git a/fotoobo/fortinet/forticlientems.py b/fotoobo/fortinet/forticlientems.py
index d047146..e8cefe9 100644
--- a/fotoobo/fortinet/forticlientems.py
+++ b/fotoobo/fortinet/forticlientems.py
@@ -1,8 +1,10 @@
 """
 FortiClient EMS Class
 """
+
 import logging
 import pickle
+import re
 from pathlib import Path
 from typing import Any, Dict, Optional
 
@@ -20,6 +22,8 @@ class FortiClientEMS(Fortinet):
     Represents one FortiClient EMS (digital twin)
     """
 
+    ALLOWED_HTTP_METHODS = ["DELETE", "GET", "PATCH", "POST"]
+
     def __init__(
         self,
         hostname: str,
@@ -41,9 +45,9 @@ def __init__(
         super().__init__(hostname, **kwargs)
         self.api_url = f"https://{self.hostname}:{self.https_port}/api/v1"
         self.cookie_path = cookie_path
-        self.password = password
-        self.username = username
-        self.type = "forticlientems"
+        self.password: str = password
+        self.username: str = username
+        self.type: str = "forticlientems"
 
     def api(  # pylint: disable=too-many-arguments
         self,
@@ -68,6 +72,9 @@ def api(  # pylint: disable=too-many-arguments
         Returns:
             Response from the request
         """
+        if not headers:
+            headers = self.session.headers  # type: ignore
+
         return super().api(
             method, url, payload=payload, params=params, timeout=timeout, headers=headers
         )
@@ -109,15 +116,19 @@ def login(self) -> int:
         """
         status = 401
         cookie = Path(self.cookie_path).expanduser() / f"{self.hostname}.cookie"
+        csrf = Path(self.cookie_path).expanduser() / f"{self.hostname}.csrf"
 
         if self.cookie_path:
-            log.debug("Searching cookie in '%s'", cookie)
+            log.debug("Searching cookie and csrf token in '%s'", cookie.parents[0])
 
-            if cookie.is_file():
-                log.debug("Cookie exists. Skipping login")
+            if cookie.is_file() and csrf.is_file():
+                log.debug("Cookie and csrf token both exist")
                 with cookie.open("rb") as cookie_file:
                     self.session.cookies.update(pickle.load(cookie_file))  # type: ignore
 
+                self.session.headers["Referer"] = f"https://{self.hostname}"
+                self.session.headers["X-CSRFToken"] = csrf.read_text()
+
                 try:
                     response = self.api("get", "/system/serial_number")
                     if (
@@ -135,22 +146,36 @@ def login(self) -> int:
                     status = err.code
 
             else:
-                log.debug("No cookie found for '%s'", self.hostname)
+                log.debug("No cookie or csrf token found for '%s'", self.hostname)
 
         if status == 401:
             log.debug("Login to '%s'", self.hostname)
             payload = {"name": self.username, "password": self.password}
             response = self.api("post", "/auth/signin", payload=payload)
 
-            if response.status_code == 200 and self.cookie_path:
-                log.debug("Saving cookie for '%s'", self.hostname)
-
-                try:
-                    with cookie.open("wb") as cookie_file:
-                        pickle.dump(self.session.cookies, cookie_file)
-
-                except FileNotFoundError:
-                    log.warning("Unable to save cookie file '%s'", str(cookie.resolve()))
+            if response.status_code == 200:
+                self.session.headers["Referer"] = f"https://{self.hostname}"
+                if match := re.match(r"csrftoken=(\S+);", response.headers["Set-Cookie"]):
+                    csrf_token = match.group(1)
+                    self.session.headers["X-CSRFToken"] = csrf_token
+
+                if self.cookie_path:
+                    log.debug("Saving cookie for '%s'", self.hostname)
+                    try:
+                        with cookie.open("wb") as cookie_file:
+                            pickle.dump(self.session.cookies, cookie_file)
+
+                    except FileNotFoundError as exc:
+                        log.debug(exc)
+                        log.warning("Unable to save cookie file '%s'", str(cookie.resolve()))
+
+                    log.debug("Saving csrf token for '%s'", self.hostname)
+                    try:
+                        csrf.write_text(csrf_token)
+
+                    except NameError as exc:
+                        log.debug(exc)
+                        log.warning("Unable to save csrf token file '%s'", str(csrf.resolve()))
 
             status = response.status_code
 
diff --git a/tests/data/ems_dummy.csrf b/tests/data/ems_dummy.csrf
new file mode 100644
index 0000000..6b24a30
--- /dev/null
+++ b/tests/data/ems_dummy.csrf
@@ -0,0 +1 @@
+dummy_csrf_token_from_cache
diff --git a/tests/data/host.cookie b/tests/data/host.cookie
deleted file mode 100644
index b8408d9a9a4dfc3e73cc33d8135f6de5b33070b2..0000000000000000000000000000000000000000
GIT binary patch
literal 0
HcmV?d00001

literal 419
zcmY+A&q@O^5XLJi?LlR=;6)$6UWyhJde)Od5nmu=v$Je$Ox8?d@gV3$T$q~?d|b2L
z(Ar!k{JwAgzVH0}ZuORrG7HZh(BW7ZGnasY4j&S_UXojG!*>{cgtu@;eO4PK#cQC0
z%DFlvJw^)+Z=mE}xi0Izv#k;b8XO8<L-?Y;EyZMdbr~8Ow%?2MDdWgijv4J$vH;di
zG>Nu-bXE$-3iHIJj^HGLREc^00#=ri=c?^+oMergN<-ONNyYAZbh?HJj!7+@<SKSk
zDsI-$(WYK=S4Em32ycPa);J@Kil^Icj}qhd0x^6B%2=+9m_v~!$1sYFD`gh{*v#iN
LNZVpOv(LwWJtwC~

diff --git a/tests/fortinet/test_forticlientems.py b/tests/fortinet/test_forticlientems.py
index e0ec1a1..8111bef 100644
--- a/tests/fortinet/test_forticlientems.py
+++ b/tests/fortinet/test_forticlientems.py
@@ -2,7 +2,8 @@
 """
 Test the FortiClient EMS class
 """
-from unittest.mock import MagicMock
+from pathlib import Path
+from unittest.mock import ANY, MagicMock
 
 import pytest
 import requests
@@ -23,13 +24,17 @@ def test_login_without_cookie(monkeypatch: MonkeyPatch) -> None:
             "fotoobo.fortinet.forticlientems.requests.Session.post",
             MagicMock(
                 return_value=ResponseMock(
-                    json={"result": {"retval": 1, "message": "Login successful."}}, status=200
+                    headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
+                    json={"result": {"retval": 1, "message": "Login successful."}},
+                    status=200,
                 )
             ),
         )
-        ems = FortiClientEMS("host", "dummy_user", "dummy_pass", ssl_verify=False)
-        assert ems.api_url == "https://host:443/api/v1"
+        ems = FortiClientEMS("ems_dummy", "dummy_user", "dummy_pass", ssl_verify=False)
+        assert ems.api_url == "https://ems_dummy:443/api/v1"
         assert ems.login() == 200
+        assert ems.session.headers["Referer"] == "https://ems_dummy"
+        assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token"
 
     @staticmethod
     def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None:
@@ -42,12 +47,16 @@ def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None:
                 )
             ),
         )
-        ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False)
-        assert ems.api_url == "https://host:443/api/v1"
+        ems = FortiClientEMS(
+            "ems_dummy", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False
+        )
+        assert ems.api_url == "https://ems_dummy:443/api/v1"
         assert ems.login() == 200
+        assert ems.session.headers["Referer"] == "https://ems_dummy"
+        assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token_from_cache\n"
 
     @staticmethod
-    def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
+    def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch, temp_dir: Path) -> None:
         """Test the login to a FortiClient EMS with no session cookie given"""
         monkeypatch.setattr(
             "fotoobo.fortinet.forticlientems.requests.Session.get",
@@ -67,6 +76,7 @@ def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
             "fotoobo.fortinet.forticlientems.requests.Session.post",
             MagicMock(
                 return_value=ResponseMock(
+                    headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
                     json={
                         "result": {"retval": 1, "message": "Login successful."},
                     },
@@ -74,17 +84,24 @@ def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
                 ),
             ),
         )
-        ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False)
+        source = Path("tests/data/ems_dummy.cookie")
+        destination = Path(temp_dir / "ems_dummy.cookie")
+        destination.write_bytes(source.read_bytes())
+        source = Path("tests/data/ems_dummy.csrf")
+        destination = Path(temp_dir / "ems_dummy.csrf")
+        destination.write_bytes(source.read_bytes())
+        ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
         assert ems.api_url == "https://host:443/api/v1"
         assert ems.login() == 200
 
     @staticmethod
-    def test_login_with_invalid_cookie_path(temp_dir: str, monkeypatch: MonkeyPatch) -> None:
+    def test_login_with_invalid_cookie_path(temp_dir: Path, monkeypatch: MonkeyPatch) -> None:
         """Test the login to a FortiClient EMS with an invalid cookie path"""
         monkeypatch.setattr(
             "fotoobo.fortinet.fortinet.requests.Session.post",
             MagicMock(
                 return_value=ResponseMock(
+                    headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
                     json={
                         "result": {"retval": 1, "message": "Login successful."},
                     },
@@ -92,8 +109,27 @@ def test_login_with_invalid_cookie_path(temp_dir: str, monkeypatch: MonkeyPatch)
                 )
             ),
         )
-        ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
-        assert ems.api_url == "https://host:443/api/v1"
+        ems = FortiClientEMS("host_1", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
+        assert ems.api_url == "https://host_1:443/api/v1"
+        assert ems.login() == 200
+
+    @staticmethod
+    def test_login_with_csrf_token_not_found(temp_dir: Path, monkeypatch: MonkeyPatch) -> None:
+        """Test the login to a FortiClient EMS when the csrf token was not found in the headers"""
+        monkeypatch.setattr(
+            "fotoobo.fortinet.fortinet.requests.Session.post",
+            MagicMock(
+                return_value=ResponseMock(
+                    headers={"Set-Cookie": "csrftoken_missing=dummy_csrf_token;"},
+                    json={
+                        "result": {"retval": 1, "message": "Login successful."},
+                    },
+                    status=200,
+                )
+            ),
+        )
+        ems = FortiClientEMS("host_2", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
+        assert ems.api_url == "https://host_2:443/api/v1"
         assert ems.login() == 200
 
     @staticmethod
@@ -143,7 +179,7 @@ def test_get_version_ok(monkeypatch: MonkeyPatch) -> None:
         response = ems.get_version()
         requests.Session.get.assert_called_with(
             "https://host:443/api/v1/system/consts/get?system_update_time=1",
-            headers=None,
+            headers=ANY,
             json=None,
             params=None,
             timeout=3,
@@ -167,7 +203,7 @@ def test_get_version_invalid(monkeypatch: MonkeyPatch) -> None:
         assert "Did not find any FortiClient EMS version number in response" in str(err.value)
         requests.Session.get.assert_called_with(
             "https://host:443/api/v1/system/consts/get?system_update_time=1",
-            headers=None,
+            headers=ANY,
             json=None,
             params=None,
             timeout=3,
diff --git a/tests/helper.py b/tests/helper.py
index 558785a..5d7e1da 100644
--- a/tests/helper.py
+++ b/tests/helper.py
@@ -21,6 +21,7 @@ def __init__(self, **kwargs: Any) -> None:
             status (int, optional): http status code. Defaults to 444 (No Response)
         """
         self.text = kwargs.get("text", "")
+        self.headers = kwargs.get("headers", [])
         self.json = MagicMock(return_value=kwargs.get("json", None))
         self.status_code = kwargs.get("status", 444)
         self.ok = kwargs.get("ok", True)

From 8473f0e01ba1ab956758b62f1d00792bb1f3a622 Mon Sep 17 00:00:00 2001
From: Patrik Spiess <patrik.spiess@mgb.ch>
Date: Wed, 28 Feb 2024 08:26:06 +0100
Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=90=9B=20Fix=20accessing=20class=20co?=
 =?UTF-8?q?nstant=20and=20add=20some=20explanation?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 fotoobo/fortinet/fortinet.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/fotoobo/fortinet/fortinet.py b/fotoobo/fortinet/fortinet.py
index e2c0c46..2082f58 100644
--- a/fotoobo/fortinet/fortinet.py
+++ b/fotoobo/fortinet/fortinet.py
@@ -23,6 +23,9 @@ class Fortinet(ABC):
     defined here with the abstractmethod decorator.
     """
 
+    # Use the ALLOWED_HTTP_METHODS class constant to define the supported HTTP methods. By default
+    # we should support GET and POST but you may override this list of supported methods in every
+    # subclass. Treat this setting as a constant which must not be redefined during runtime.
     ALLOWED_HTTP_METHODS = ["GET", "POST"]
 
     def __init__(self, hostname: str, **kwargs: Any) -> None:
@@ -94,7 +97,7 @@ def api(  # pylint: disable=too-many-arguments
         timeout = timeout or self.timeout
         start = time()
 
-        if method.upper() not in Fortinet.ALLOWED_HTTP_METHODS:
+        if method.upper() not in self.ALLOWED_HTTP_METHODS:
             error = f"HTTP method '{method.upper()}' is not implemented"
             log.error(error)
             raise NotImplementedError(error)