diff --git a/apitally/client/client_asyncio.py b/apitally/client/client_asyncio.py index 995a6f4..d41cd6c 100644 --- a/apitally/client/client_asyncio.py +++ b/apitally/client/client_asyncio.py @@ -4,6 +4,7 @@ import logging import random import time +from contextlib import suppress from functools import partial from typing import Any, AsyncIterator, Dict, Optional, Tuple from uuid import UUID @@ -62,11 +63,7 @@ async def _run_sync_loop(self) -> None: except Exception: # pragma: no cover logger.exception("An error occurred during sync with Apitally hub") - try: - self.request_logger.maybe_rotate_file() - except Exception: # pragma: no cover - logger.exception("An error occurred while rotating request log files") - + self.request_logger.maintain() await asyncio.sleep(1) def stop_sync_loop(self) -> None: @@ -147,6 +144,12 @@ async def _send_sync_data(self, client: httpx.AsyncClient, data: Dict[str, Any]) async def _send_log_data(self, client: httpx.AsyncClient, uuid: UUID, stream: AsyncIterator[bytes]) -> None: logger.debug("Streaming request log data to Apitally hub") response = await client.post(url=f"{self.hub_url}/log?uuid={uuid}", content=stream) + if response.status_code == 402 and "Retry-After" in response.headers: + with suppress(ValueError): + retry_after = int(response.headers["Retry-After"]) + self.request_logger.suspend_until = time.time() + retry_after + self.request_logger.clear() + return self._handle_hub_response(response) def _handle_hub_response(self, response: httpx.Response) -> None: diff --git a/apitally/client/client_threading.py b/apitally/client/client_threading.py index badc1d2..13b3f5b 100644 --- a/apitally/client/client_threading.py +++ b/apitally/client/client_threading.py @@ -3,6 +3,7 @@ import logging import random import time +from contextlib import suppress from functools import partial from io import BufferedReader from queue import Queue @@ -80,11 +81,7 @@ def _run_sync_loop(self) -> None: except Exception: # pragma: no cover logger.exception("An error occurred during sync with Apitally hub") - try: - self.request_logger.maybe_rotate_file() - except Exception: # pragma: no cover - logger.exception("An error occurred while rotating request log files") - + self.request_logger.maintain() time.sleep(1) finally: # Send any remaining data before exiting @@ -162,6 +159,12 @@ def _send_sync_data(self, session: requests.Session, data: Dict[str, Any]) -> No def _send_log_data(self, session: requests.Session, uuid: UUID, fp: BufferedReader) -> None: logger.debug("Streaming request log data to Apitally hub") response = session.post(url=f"{self.hub_url}/log?uuid={uuid}", data=fp, timeout=REQUEST_TIMEOUT) + if response.status_code == 402 and "Retry-After" in response.headers: + with suppress(ValueError): + retry_after = int(response.headers["Retry-After"]) + self.request_logger.suspend_until = time.time() + retry_after + self.request_logger.clear() + return self._handle_hub_response(response) def _handle_hub_response(self, response: requests.Response) -> None: diff --git a/apitally/client/request_logging.py b/apitally/client/request_logging.py index b4e0bbd..dbf10a1 100644 --- a/apitally/client/request_logging.py +++ b/apitally/client/request_logging.py @@ -3,6 +3,7 @@ import re import tempfile import threading +import time from collections import deque from contextlib import suppress from dataclasses import dataclass, field @@ -149,13 +150,14 @@ def __init__(self, config: Optional[RequestLoggingConfig]) -> None: self.file_deque: deque[TempGzipFile] = deque([]) self.file: Optional[TempGzipFile] = None self.lock = threading.Lock() + self.suspend_until: Optional[float] = None @property def current_file_size(self) -> int: return self.file.size if self.file is not None else 0 def log_request(self, request: RequestDict, response: ResponseDict) -> None: - if not self.enabled: + if not self.enabled or self.suspend_until is not None: return parsed_url = urlparse(request["url"]) if self._should_exclude_path(request["path"] or parsed_url.path): @@ -241,18 +243,25 @@ def rotate_file(self) -> None: self.file_deque.append(self.file) self.file = None - def maybe_rotate_file(self) -> None: + def maintain(self) -> None: if self.current_file_size > MAX_FILE_SIZE: self.rotate_file() while len(self.file_deque) > MAX_FILES_IN_DEQUE: file = self.file_deque.popleft() file.delete() + if self.suspend_until is not None and self.suspend_until < time.time(): + self.suspend_until = None - def close(self) -> None: - self.enabled = False + def clear(self) -> None: + self.write_deque.clear() self.rotate_file() for file in self.file_deque: file.delete() + self.file_deque.clear() + + def close(self) -> None: + self.enabled = False + self.clear() @lru_cache(maxsize=1000) def _should_exclude_path(self, url_path: str) -> bool: @@ -291,7 +300,7 @@ def _has_supported_content_type(headers: List[Tuple[str, str]]) -> bool: return content_type is not None and any(content_type.startswith(t) for t in ALLOWED_CONTENT_TYPES) -def _check_writable_fs(): +def _check_writable_fs() -> bool: try: with tempfile.NamedTemporaryFile(): return True diff --git a/tests/test_client_asyncio.py b/tests/test_client_asyncio.py index e74acc7..c1852e8 100644 --- a/tests/test_client_asyncio.py +++ b/tests/test_client_asyncio.py @@ -1,7 +1,10 @@ from __future__ import annotations import asyncio +import gzip import json +import re +import time from typing import TYPE_CHECKING import pytest @@ -17,9 +20,13 @@ @pytest.fixture(scope="module") async def client() -> ApitallyClient: - from apitally.client.client_asyncio import ApitallyClient + from apitally.client.client_asyncio import ApitallyClient, RequestLoggingConfig - client = ApitallyClient(client_id=CLIENT_ID, env=ENV) + client = ApitallyClient( + client_id=CLIENT_ID, + env=ENV, + request_logging_config=RequestLoggingConfig(enabled=True), + ) client.request_counter.add_request( consumer=None, method="GET", @@ -56,6 +63,29 @@ async def client() -> ApitallyClient: return client +def log_request(client: ApitallyClient) -> None: + client.request_logger.log_request( + request={ + "timestamp": time.time(), + "method": "GET", + "path": "/test", + "url": "http://testserver/test", + "headers": [], + "size": 0, + "consumer": None, + "body": None, + }, + response={ + "status_code": 200, + "response_time": 0.105, + "headers": [], + "size": 0, + "body": None, + }, + ) + client.request_logger.write_to_file() + + async def test_sync_loop(client: ApitallyClient, mocker: MockerFixture): send_sync_data_mock = mocker.patch("apitally.client.client_asyncio.ApitallyClient.send_sync_data") mocker.patch("apitally.client.client_base.INITIAL_SYNC_INTERVAL", 0.05) @@ -83,6 +113,41 @@ async def test_send_sync_data(client: ApitallyClient, httpx_mock: HTTPXMock): assert request_data["validation_errors"][0]["error_count"] == 1 +async def test_send_log_data(client: ApitallyClient, httpx_mock: HTTPXMock): + from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION + + log_request(client) + httpx_mock.add_response() + async with client.get_http_client() as http_client: + await client.send_log_data(client=http_client) + + url_pattern = re.compile(rf"{HUB_BASE_URL}/{HUB_VERSION}/{CLIENT_ID}/{ENV}/log\?uuid=[a-f0-9-]+$") + requests = httpx_mock.get_requests(url=url_pattern) + assert len(requests) == 1 + json_lines = gzip.decompress(requests[0].content).strip().split(b"\n") + assert len(json_lines) == 1 + json_data = json.loads(json_lines[0]) + assert json_data["request"]["path"] == "/test" + assert json_data["response"]["status_code"] == 200 + httpx_mock.reset() + + # Test 402 response with Retry-After header + log_request(client) + httpx_mock.add_response(status_code=402, headers={"Retry-After": "3600"}) + async with client.get_http_client() as http_client: + await client.send_log_data(client=http_client) + requests = httpx_mock.get_requests(url=url_pattern) + assert len(requests) == 1 + assert client.request_logger.suspend_until is not None + assert client.request_logger.suspend_until > time.time() + 3590 + + # Ensure not logging requests anymore + log_request(client) + assert client.request_logger.file is None + assert len(client.request_logger.write_deque) == 0 + assert len(client.request_logger.file_deque) == 0 + + async def test_set_startup_data(client: ApitallyClient, httpx_mock: HTTPXMock): from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION diff --git a/tests/test_client_threading.py b/tests/test_client_threading.py index 0f79fe5..869c97b 100644 --- a/tests/test_client_threading.py +++ b/tests/test_client_threading.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re import time from typing import TYPE_CHECKING @@ -17,9 +18,13 @@ @pytest.fixture(scope="module") def client() -> ApitallyClient: - from apitally.client.client_threading import ApitallyClient + from apitally.client.client_threading import ApitallyClient, RequestLoggingConfig - client = ApitallyClient(client_id=CLIENT_ID, env=ENV) + client = ApitallyClient( + client_id=CLIENT_ID, + env=ENV, + request_logging_config=RequestLoggingConfig(enabled=True), + ) client.request_counter.add_request( consumer=None, method="GET", @@ -56,6 +61,29 @@ def client() -> ApitallyClient: return client +def log_request(client: ApitallyClient) -> None: + client.request_logger.log_request( + request={ + "timestamp": time.time(), + "method": "GET", + "path": "/test", + "url": "http://testserver/test", + "headers": [], + "size": 0, + "consumer": None, + "body": None, + }, + response={ + "status_code": 200, + "response_time": 0.105, + "headers": [], + "size": 0, + "body": None, + }, + ) + client.request_logger.write_to_file() + + def test_sync_loop(client: ApitallyClient, mocker: MockerFixture): send_sync_data_mock = mocker.patch("apitally.client.client_threading.ApitallyClient.send_sync_data") mocker.patch("apitally.client.client_base.INITIAL_SYNC_INTERVAL", 0.05) @@ -82,6 +110,37 @@ def test_send_sync_data(client: ApitallyClient, requests_mock: Mocker): assert request_data["validation_errors"][0]["error_count"] == 1 +def test_send_log_data(client: ApitallyClient, requests_mock: Mocker): + from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION + + log_request(client) + url_pattern = re.compile(rf"{HUB_BASE_URL}/{HUB_VERSION}/{CLIENT_ID}/{ENV}/log\?uuid=[a-f0-9-]+$") + mock = requests_mock.register_uri("POST", url_pattern) + with requests.Session() as session: + client.send_log_data(session) + + assert len(mock.request_history) == 1 + # Ideally we'd also check the request body for correctness, but the following issue prevents us from doing so: + # https://github.com/jamielennox/requests-mock/issues/243 + requests_mock.reset() + + # Test 402 response with Retry-After header + log_request(client) + mock = requests_mock.register_uri("POST", url_pattern, status_code=402, headers={"Retry-After": "3600"}) + with requests.Session() as session: + client.send_log_data(session) + + assert len(mock.request_history) == 1 + assert client.request_logger.suspend_until is not None + assert client.request_logger.suspend_until > time.time() + 3590 + + # Ensure not logging requests anymore + log_request(client) + assert client.request_logger.file is None + assert len(client.request_logger.write_deque) == 0 + assert len(client.request_logger.file_deque) == 0 + + def test_set_startup_data(client: ApitallyClient, requests_mock: Mocker): from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION