From e8c662cd54db17f9ee036e303a46dbac257ed123 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Thu, 22 Feb 2024 12:40:20 +0100 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=8E=A8=20Make=20fortinet.api=20method?= =?UTF-8?q?=20more=20generic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fotoobo/fortinet/fortinet.py | 77 +++++++++++++-------------- tests/fortinet/test_fortianalyzer.py | 3 +- tests/fortinet/test_forticlientems.py | 12 +++-- tests/fortinet/test_fortimanager.py | 49 +++++++++++------ tests/fortinet/test_fortinet.py | 56 +++++++++---------- 5 files changed, 109 insertions(+), 88 deletions(-) diff --git a/fotoobo/fortinet/fortinet.py b/fotoobo/fortinet/fortinet.py index 560bf00..1988539 100644 --- a/fotoobo/fortinet/fortinet.py +++ b/fotoobo/fortinet/fortinet.py @@ -2,6 +2,7 @@ This is the Fortinet abstract base class (ABC) which is used to define some global and generic variables and methods. """ + import logging from abc import ABC, abstractmethod from time import time @@ -22,6 +23,8 @@ class Fortinet(ABC): defined here with the abstractmethod decorator. """ + ALLOWED_HTTP_METHODS = ["GET", "POST"] + def __init__(self, hostname: str, **kwargs: Any) -> None: """ Set some initial parameters for the Fortinet super class. @@ -77,43 +80,49 @@ def api( # pylint: disable=too-many-arguments API request to a Fortinet device. Args: - method: Request method from [get, post] - url: Rest API URL to request data from - headers: Dictionary with headers (if needed) - params: Dictionary with parameters (if needed) - payload: JSON body for post requests (if needed) - timeout: The requests read timeout + method: HTTP request method + url: Rest API URL to request data from + headers: Dictionary with headers (if needed) + params: Dictionary with parameters (if needed) + payload: JSON body for post requests (if needed) + timeout: The requests read timeout Returns: Response from the request """ full_url = f"{self.api_url}/{url.strip('/')}".strip("/") - params = params or {} - payload = payload or {} timeout = timeout or self.timeout start = time() + if method.upper() not in self.ALLOWED_HTTP_METHODS: + error = f"HTTP method '{method.lower()}' is not implemented" + log.error(error) + raise NotImplementedError(error) + try: - if method.lower() == "get": - response = self.session.get( - full_url, - params=params, - verify=self.ssl_verify, - timeout=timeout, - ) - - elif method.lower() == "post": - response = self.session.post( - full_url, - headers=headers, - json=payload, - verify=self.ssl_verify, - timeout=timeout, - ) - - else: - log.debug("Unknown API method") - raise GeneralError("Unknown API method") + response: requests.Response = getattr(self.session, method.lower())( + full_url, + headers=headers, + json=payload, + params=params, + timeout=timeout, + verify=self.ssl_verify, + ) + + except requests.exceptions.SSLError as err: + log.debug(err) + error = "Unknown SSL error" + try: + if ( + err.args[0].reason.args[0].verify_message + == "unable to get local issuer certificate" + ): + error = "Unable to get local issuer certificate" + + except (AttributeError, IndexError): + pass + + raise GeneralError(f"{error} ({self.hostname})") from err except requests.exceptions.ConnectTimeout as err: log.debug(err) @@ -122,7 +131,6 @@ def api( # pylint: disable=too-many-arguments except requests.exceptions.ConnectionError as err: log.debug(err) error = "Unknown connection error" - try: if "Name or service not known" in err.args[0].reason.args[0]: error = "Name or service not known" @@ -133,20 +141,11 @@ def api( # pylint: disable=too-many-arguments except (IndexError, AttributeError, TypeError): pass - try: - if ( - err.args[0].reason.args[0].verify_message - == "unable to get local issuer certificate" - ): - error = "Unable to get local issuer certificate" - except (IndexError, AttributeError, TypeError): - pass - raise GeneralError(f"{error} ({self.hostname})") from err except requests.exceptions.ReadTimeout as err: log.error(err) - raise GeneralError(f"read timeout ({self.hostname})") from err + raise GeneralError(f"Read timeout ({self.hostname})") from err log.debug( 'Request time: [bold green]%2dms[/] "%s %s"', diff --git a/tests/fortinet/test_fortianalyzer.py b/tests/fortinet/test_fortianalyzer.py index 44d5955..774265b 100644 --- a/tests/fortinet/test_fortianalyzer.py +++ b/tests/fortinet/test_fortianalyzer.py @@ -42,6 +42,7 @@ def test_get_version(response: MagicMock, expected: str, monkeypatch: MonkeyPatc "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/sys/status"}], "session": ""}, - verify=True, + params=None, timeout=3, + verify=True, ) diff --git a/tests/fortinet/test_forticlientems.py b/tests/fortinet/test_forticlientems.py index 3e2c315..e0ec1a1 100644 --- a/tests/fortinet/test_forticlientems.py +++ b/tests/fortinet/test_forticlientems.py @@ -143,9 +143,11 @@ def test_get_version_ok(monkeypatch: MonkeyPatch) -> None: response = ems.get_version() requests.Session.get.assert_called_with( "https://host:443/api/v1/system/consts/get?system_update_time=1", - params={}, - verify=True, + headers=None, + json=None, + params=None, timeout=3, + verify=True, ) assert response == "1.2.3" @@ -165,9 +167,11 @@ def test_get_version_invalid(monkeypatch: MonkeyPatch) -> None: assert "Did not find any FortiClient EMS version number in response" in str(err.value) requests.Session.get.assert_called_with( "https://host:443/api/v1/system/consts/get?system_update_time=1", - params={}, - verify=False, + headers=None, + json=None, + params=None, timeout=3, + verify=False, ) @staticmethod diff --git a/tests/fortinet/test_fortimanager.py b/tests/fortinet/test_fortimanager.py index f9aeee8..e2dafce 100644 --- a/tests/fortinet/test_fortimanager.py +++ b/tests/fortinet/test_fortimanager.py @@ -1,6 +1,7 @@ """ Test the FortiManager class """ + # pylint: disable=no-member from unittest.mock import MagicMock @@ -54,8 +55,9 @@ def test_assign_all_objects(monkeypatch: MonkeyPatch) -> None: ], "session": "", }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -85,8 +87,9 @@ def test_assign_all_objects_http_404(monkeypatch: MonkeyPatch) -> None: ], "session": "", }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -130,8 +133,9 @@ def test_assign_all_objects_status_not_ok(monkeypatch: MonkeyPatch) -> None: ], "session": "", }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -150,8 +154,9 @@ def test_get_adoms(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/dvmdb/adom"}], "session": ""}, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -188,8 +193,9 @@ def test_get_version(response: MagicMock, expected: str, monkeypatch: MonkeyPatc "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/sys/status"}], "session": ""}, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -219,8 +225,9 @@ def test_login(monkeypatch: MonkeyPatch) -> None: "method": "exec", "params": [{"data": {"passwd": "pass", "user": "user"}, "url": "/sys/login/user"}], }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -256,8 +263,9 @@ def test_login_with_session_path(monkeypatch: MonkeyPatch) -> None: "params": [{"url": "/sys/status"}], "session": "dummy_session_key", }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -294,8 +302,9 @@ def test_login_with_session_path_invalid_key(monkeypatch: MonkeyPatch) -> None: "method": "exec", "params": [{"data": {"passwd": "pass", "user": "user"}, "url": "/sys/login/user"}], }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -332,8 +341,9 @@ def test_login_with_session_path_not_found(temp_dir: str, monkeypatch: MonkeyPat "method": "exec", "params": [{"data": {"passwd": "pass", "user": "user"}, "url": "/sys/login/user"}], }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -362,8 +372,9 @@ def test_logout(monkeypatch: MonkeyPatch) -> None: "params": [{"url": "/sys/logout"}], "session": "dummy_session_key", }, - verify=True, + params=None, timeout=3, + verify=True, ) @staticmethod @@ -380,8 +391,9 @@ def test_post_single(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, - verify=True, + params=None, timeout=10, + verify=True, ) @staticmethod @@ -398,8 +410,9 @@ def test_post_multiple(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, - verify=True, + params=None, timeout=10, + verify=True, ) @staticmethod @@ -416,8 +429,9 @@ def test_post_single_global(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "global"}], "session": ""}, - verify=True, + params=None, timeout=10, + verify=True, ) @staticmethod @@ -441,8 +455,9 @@ def test_post_response_error(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, - verify=True, + params=None, timeout=10, + verify=True, ) @staticmethod @@ -459,8 +474,9 @@ def test_post_http_error(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, - verify=True, + params=None, timeout=10, + verify=True, ) @staticmethod @@ -501,6 +517,7 @@ def test_wait_for_task(monkeypatch: MonkeyPatch) -> None: "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/task/task/222/line"}], "session": ""}, - verify=True, + params=None, timeout=3, + verify=True, ) diff --git a/tests/fortinet/test_fortinet.py b/tests/fortinet/test_fortinet.py index 051439d..1fa0ed1 100644 --- a/tests/fortinet/test_fortinet.py +++ b/tests/fortinet/test_fortinet.py @@ -62,7 +62,9 @@ def test_api_get(monkeypatch: MonkeyPatch) -> None: response = FortinetTestClass("dummy").api("get", "url") assert response.status_code == 200 assert response.json() == {"version": "v1.1.1"} - requests.Session.get.assert_called_with("url", params={}, verify=True, timeout=3) + requests.Session.get.assert_called_with( + "url", headers=None, json=None, params=None, timeout=3, verify=True + ) @staticmethod def test_api_post(monkeypatch: MonkeyPatch) -> None: @@ -75,7 +77,7 @@ def test_api_post(monkeypatch: MonkeyPatch) -> None: assert response.status_code == 200 assert response.json() == {"version": "v1.1.1"} requests.Session.post.assert_called_with( - "url", headers=None, json={}, verify=True, timeout=3 + "url", headers=None, json=None, params=None, timeout=3, verify=True ) @staticmethod @@ -88,9 +90,8 @@ def test_api_get_connection_timeout(method: str, monkeypatch: MonkeyPatch) -> No f"fotoobo.fortinet.fortinet.requests.Session.{method}", MagicMock(side_effect=requests.exceptions.ConnectTimeout()), ) - with pytest.raises(GeneralError) as err: + with pytest.raises(GeneralError, match=r"Connection timeout \(dummy\)"): FortinetTestClass("dummy").api(method, "url") - assert "Connection timeout" in str(err.value) @staticmethod @pytest.mark.parametrize( @@ -102,19 +103,14 @@ def test_api_get_read_timeout(method: str, monkeypatch: MonkeyPatch) -> None: f"fotoobo.fortinet.fortinet.requests.Session.{method}", MagicMock(side_effect=requests.exceptions.ReadTimeout()), ) - with pytest.raises(GeneralError) as err: + with pytest.raises(GeneralError, match=r"Read timeout \(dummy\)"): FortinetTestClass("dummy").api(method, "url") - assert "read timeout" in str(err.value) @staticmethod def test_api_unknown_method() -> None: """Test api with unknown method""" - try: + with pytest.raises(NotImplementedError, match=r"HTTP method 'dummy' is not implemented"): FortinetTestClass("dummy").api("dummy", "url") - assert False - except GeneralError as err: - assert True - assert "Unknown API method" in err.message @staticmethod @pytest.mark.parametrize( @@ -159,9 +155,7 @@ def test_api_connection_error( f"fotoobo.fortinet.fortinet.requests.Session.{method}", MagicMock( side_effect=requests.exceptions.ConnectionError( - MagicMock( - reason=NewConnectionError(reason, message=""), - ), + MagicMock(reason=NewConnectionError(reason, message="")), ) ), ) @@ -173,28 +167,34 @@ def test_api_connection_error( @pytest.mark.parametrize( "method", (pytest.param("get", id="get"), pytest.param("post", id="post")) ) - def test_api_connection_error_cert_check(method: str, monkeypatch: MonkeyPatch) -> None: + @pytest.mark.parametrize( + "ssl_error, expected", + ( + pytest.param( + SSLError(MagicMock(verify_message="unable to get local issuer certificate")), + r"Unable to get local issuer certificate \(dummy\)", + id="unknown cert", + ), + pytest.param( + SSLError(MagicMock(spec=())), + r"Unknown SSL error \(dummy\)", + id="unknown error", + ), + ), + ) + def test_api_ssl_error( + method: str, ssl_error: SSLError, expected: str, monkeypatch: MonkeyPatch + ) -> None: """Test api with connection errors when the cert is not valid We have to do this test especially for the cert_check because its message is not in err.args[0].reason.args[0] but in err.args[0].reason.args[0].verify_message """ monkeypatch.setattr( f"fotoobo.fortinet.fortinet.requests.Session.{method}", - MagicMock( - side_effect=requests.exceptions.ConnectionError( - MagicMock( - reason=SSLError( - MagicMock( - verify_message="unable to get local issuer certificate", - ) - ) - ) - ) - ), + MagicMock(side_effect=requests.exceptions.SSLError(MagicMock(reason=ssl_error))), ) - with pytest.raises(GeneralError) as err: + with pytest.raises(GeneralError, match=expected): FortinetTestClass("dummy").api(method, "url") - assert "Unable to get local issuer certificate" in str(err.value) @staticmethod @pytest.mark.parametrize( From ac30235f6dcf04f0d4c983af2c62d61bcbefaf47 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Thu, 22 Feb 2024 12:46:21 +0100 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=93=9D=20Update=20WHATSNEW?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WHATSNEW.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/WHATSNEW.md b/WHATSNEW.md index 74b2dee..00f4eee 100644 --- a/WHATSNEW.md +++ b/WHATSNEW.md @@ -5,6 +5,9 @@ ### Changed - `fmg get devices` also shows ha nodes if device is a cluster +- Make `Fortinet.api()` method more generic to support more method +- Improve error handling and tests for `Fortinet.api()` method + ### Fixed From 098eb54a02a40046f11015ef32b8eaa59b6c0dd9 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Thu, 22 Feb 2024 12:50:31 +0100 Subject: [PATCH 3/4] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20Typo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WHATSNEW.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/WHATSNEW.md b/WHATSNEW.md index 00f4eee..5fe1a5c 100644 --- a/WHATSNEW.md +++ b/WHATSNEW.md @@ -5,8 +5,8 @@ ### Changed - `fmg get devices` also shows ha nodes if device is a cluster -- Make `Fortinet.api()` method more generic to support more method -- Improve error handling and tests for `Fortinet.api()` method +- Make `Fortinet.api()` more generic to support more methods +- Improve error handling and tests for `Fortinet.api()` ### Fixed From 08d2aa56622b2e97c44a51570946db8aa6831175 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Mon, 26 Feb 2024 09:53:15 +0100 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=8E=A8=20Improve=20some=20small=20det?= =?UTF-8?q?ails?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fotoobo/fortinet/fortinet.py | 6 ++++-- tests/fortinet/test_fortinet.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/fotoobo/fortinet/fortinet.py b/fotoobo/fortinet/fortinet.py index 1988539..e2c0c46 100644 --- a/fotoobo/fortinet/fortinet.py +++ b/fotoobo/fortinet/fortinet.py @@ -94,8 +94,8 @@ def api( # pylint: disable=too-many-arguments timeout = timeout or self.timeout start = time() - if method.upper() not in self.ALLOWED_HTTP_METHODS: - error = f"HTTP method '{method.lower()}' is not implemented" + if method.upper() not in Fortinet.ALLOWED_HTTP_METHODS: + error = f"HTTP method '{method.upper()}' is not implemented" log.error(error) raise NotImplementedError(error) @@ -112,6 +112,7 @@ def api( # pylint: disable=too-many-arguments except requests.exceptions.SSLError as err: log.debug(err) error = "Unknown SSL error" + try: if ( err.args[0].reason.args[0].verify_message @@ -131,6 +132,7 @@ def api( # pylint: disable=too-many-arguments except requests.exceptions.ConnectionError as err: log.debug(err) error = "Unknown connection error" + try: if "Name or service not known" in err.args[0].reason.args[0]: error = "Name or service not known" diff --git a/tests/fortinet/test_fortinet.py b/tests/fortinet/test_fortinet.py index 1fa0ed1..9b3b8b2 100644 --- a/tests/fortinet/test_fortinet.py +++ b/tests/fortinet/test_fortinet.py @@ -109,7 +109,7 @@ def test_api_get_read_timeout(method: str, monkeypatch: MonkeyPatch) -> None: @staticmethod def test_api_unknown_method() -> None: """Test api with unknown method""" - with pytest.raises(NotImplementedError, match=r"HTTP method 'dummy' is not implemented"): + with pytest.raises(NotImplementedError, match=r"HTTP method 'DUMMY' is not implemented"): FortinetTestClass("dummy").api("dummy", "url") @staticmethod