Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for http patch and delete method in forti client ems #196 #197

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions WHATSNEW.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
### Added

- Option `--raw` for `fmg get devices`
- Support for HTTP `PATCH` and `DELETE` method in FortiClientEMS
- Option `--smtp` for `fgt config check`


### Changed

- `fmg get devices` also shows ha nodes if device is a cluster
Expand Down
57 changes: 41 additions & 16 deletions fotoobo/fortinet/forticlientems.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""
FortiClient EMS Class
"""

import logging
import pickle
import re
from pathlib import Path
from typing import Any, Dict, Optional

Expand All @@ -20,6 +22,8 @@ class FortiClientEMS(Fortinet):
Represents one FortiClient EMS (digital twin)
"""

ALLOWED_HTTP_METHODS = ["DELETE", "GET", "PATCH", "POST"]

def __init__(
self,
hostname: str,
Expand All @@ -41,9 +45,9 @@ def __init__(
super().__init__(hostname, **kwargs)
self.api_url = f"https://{self.hostname}:{self.https_port}/api/v1"
self.cookie_path = cookie_path
self.password = password
self.username = username
self.type = "forticlientems"
self.password: str = password
self.username: str = username
self.type: str = "forticlientems"

def api( # pylint: disable=too-many-arguments
self,
Expand All @@ -68,6 +72,9 @@ def api( # pylint: disable=too-many-arguments
Returns:
Response from the request
"""
if not headers:
headers = self.session.headers # type: ignore

return super().api(
method, url, payload=payload, params=params, timeout=timeout, headers=headers
)
Expand Down Expand Up @@ -109,15 +116,19 @@ def login(self) -> int:
"""
status = 401
cookie = Path(self.cookie_path).expanduser() / f"{self.hostname}.cookie"
csrf = Path(self.cookie_path).expanduser() / f"{self.hostname}.csrf"

if self.cookie_path:
log.debug("Searching cookie in '%s'", cookie)
log.debug("Searching cookie and csrf token in '%s'", cookie.parents[0])

if cookie.is_file():
log.debug("Cookie exists. Skipping login")
if cookie.is_file() and csrf.is_file():
log.debug("Cookie and csrf token both exist")
with cookie.open("rb") as cookie_file:
self.session.cookies.update(pickle.load(cookie_file)) # type: ignore

self.session.headers["Referer"] = f"https://{self.hostname}"
self.session.headers["X-CSRFToken"] = csrf.read_text()

try:
response = self.api("get", "/system/serial_number")
if (
Expand All @@ -135,22 +146,36 @@ def login(self) -> int:
status = err.code

else:
log.debug("No cookie found for '%s'", self.hostname)
log.debug("No cookie or csrf token found for '%s'", self.hostname)

if status == 401:
log.debug("Login to '%s'", self.hostname)
payload = {"name": self.username, "password": self.password}
response = self.api("post", "/auth/signin", payload=payload)

if response.status_code == 200 and self.cookie_path:
log.debug("Saving cookie for '%s'", self.hostname)

try:
with cookie.open("wb") as cookie_file:
pickle.dump(self.session.cookies, cookie_file)

except FileNotFoundError:
log.warning("Unable to save cookie file '%s'", str(cookie.resolve()))
if response.status_code == 200:
self.session.headers["Referer"] = f"https://{self.hostname}"
if match := re.match(r"csrftoken=(\S+);", response.headers["Set-Cookie"]):
csrf_token = match.group(1)
self.session.headers["X-CSRFToken"] = csrf_token

if self.cookie_path:
log.debug("Saving cookie for '%s'", self.hostname)
try:
with cookie.open("wb") as cookie_file:
pickle.dump(self.session.cookies, cookie_file)

except FileNotFoundError as exc:
log.debug(exc)
log.warning("Unable to save cookie file '%s'", str(cookie.resolve()))

log.debug("Saving csrf token for '%s'", self.hostname)
try:
csrf.write_text(csrf_token)

except NameError as exc:
log.debug(exc)
log.warning("Unable to save csrf token file '%s'", str(csrf.resolve()))

status = response.status_code

Expand Down
5 changes: 4 additions & 1 deletion fotoobo/fortinet/fortinet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class Fortinet(ABC):
defined here with the abstractmethod decorator.
"""

# Use the ALLOWED_HTTP_METHODS class constant to define the supported HTTP methods. By default
# we should support GET and POST but you may override this list of supported methods in every
# subclass. Treat this setting as a constant which must not be redefined during runtime.
ALLOWED_HTTP_METHODS = ["GET", "POST"]

def __init__(self, hostname: str, **kwargs: Any) -> None:
Expand Down Expand Up @@ -94,7 +97,7 @@ def api( # pylint: disable=too-many-arguments
timeout = timeout or self.timeout
start = time()

if method.upper() not in Fortinet.ALLOWED_HTTP_METHODS:
if method.upper() not in self.ALLOWED_HTTP_METHODS:
error = f"HTTP method '{method.upper()}' is not implemented"
log.error(error)
raise NotImplementedError(error)
Expand Down
1 change: 1 addition & 0 deletions tests/data/ems_dummy.csrf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dummy_csrf_token_from_cache
Binary file removed tests/data/host.cookie
Binary file not shown.
62 changes: 49 additions & 13 deletions tests/fortinet/test_forticlientems.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"""
Test the FortiClient EMS class
"""
from unittest.mock import MagicMock
from pathlib import Path
from unittest.mock import ANY, MagicMock

import pytest
import requests
Expand All @@ -23,13 +24,17 @@ def test_login_without_cookie(monkeypatch: MonkeyPatch) -> None:
"fotoobo.fortinet.forticlientems.requests.Session.post",
MagicMock(
return_value=ResponseMock(
json={"result": {"retval": 1, "message": "Login successful."}}, status=200
headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
json={"result": {"retval": 1, "message": "Login successful."}},
status=200,
)
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
ems = FortiClientEMS("ems_dummy", "dummy_user", "dummy_pass", ssl_verify=False)
assert ems.api_url == "https://ems_dummy:443/api/v1"
assert ems.login() == 200
assert ems.session.headers["Referer"] == "https://ems_dummy"
assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token"

@staticmethod
def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None:
Expand All @@ -42,12 +47,16 @@ def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None:
)
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
ems = FortiClientEMS(
"ems_dummy", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False
)
assert ems.api_url == "https://ems_dummy:443/api/v1"
assert ems.login() == 200
assert ems.session.headers["Referer"] == "https://ems_dummy"
assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token_from_cache\n"

@staticmethod
def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch, temp_dir: Path) -> None:
"""Test the login to a FortiClient EMS with no session cookie given"""
monkeypatch.setattr(
"fotoobo.fortinet.forticlientems.requests.Session.get",
Expand All @@ -67,33 +76,60 @@ def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
"fotoobo.fortinet.forticlientems.requests.Session.post",
MagicMock(
return_value=ResponseMock(
headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
json={
"result": {"retval": 1, "message": "Login successful."},
},
status=200,
),
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False)
source = Path("tests/data/ems_dummy.cookie")
destination = Path(temp_dir / "ems_dummy.cookie")
destination.write_bytes(source.read_bytes())
source = Path("tests/data/ems_dummy.csrf")
destination = Path(temp_dir / "ems_dummy.csrf")
destination.write_bytes(source.read_bytes())
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
assert ems.login() == 200

@staticmethod
def test_login_with_invalid_cookie_path(temp_dir: str, monkeypatch: MonkeyPatch) -> None:
def test_login_with_invalid_cookie_path(temp_dir: Path, monkeypatch: MonkeyPatch) -> None:
"""Test the login to a FortiClient EMS with an invalid cookie path"""
monkeypatch.setattr(
"fotoobo.fortinet.fortinet.requests.Session.post",
MagicMock(
return_value=ResponseMock(
headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
json={
"result": {"retval": 1, "message": "Login successful."},
},
status=200,
)
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
ems = FortiClientEMS("host_1", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host_1:443/api/v1"
assert ems.login() == 200

@staticmethod
def test_login_with_csrf_token_not_found(temp_dir: Path, monkeypatch: MonkeyPatch) -> None:
"""Test the login to a FortiClient EMS when the csrf token was not found in the headers"""
monkeypatch.setattr(
"fotoobo.fortinet.fortinet.requests.Session.post",
MagicMock(
return_value=ResponseMock(
headers={"Set-Cookie": "csrftoken_missing=dummy_csrf_token;"},
json={
"result": {"retval": 1, "message": "Login successful."},
},
status=200,
)
),
)
ems = FortiClientEMS("host_2", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host_2:443/api/v1"
assert ems.login() == 200

@staticmethod
Expand Down Expand Up @@ -143,7 +179,7 @@ def test_get_version_ok(monkeypatch: MonkeyPatch) -> None:
response = ems.get_version()
requests.Session.get.assert_called_with(
"https://host:443/api/v1/system/consts/get?system_update_time=1",
headers=None,
headers=ANY,
json=None,
params=None,
timeout=3,
Expand All @@ -167,7 +203,7 @@ def test_get_version_invalid(monkeypatch: MonkeyPatch) -> None:
assert "Did not find any FortiClient EMS version number in response" in str(err.value)
requests.Session.get.assert_called_with(
"https://host:443/api/v1/system/consts/get?system_update_time=1",
headers=None,
headers=ANY,
json=None,
params=None,
timeout=3,
Expand Down
1 change: 1 addition & 0 deletions tests/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, **kwargs: Any) -> None:
status (int, optional): http status code. Defaults to 444 (No Response)
"""
self.text = kwargs.get("text", "")
self.headers = kwargs.get("headers", [])
self.json = MagicMock(return_value=kwargs.get("json", None))
self.status_code = kwargs.get("status", 444)
self.ok = kwargs.get("ok", True)
Expand Down