Skip to content

Commit

Permalink
Handle 402 responses from hub
Browse files Browse the repository at this point in the history
  • Loading branch information
itssimon committed Nov 26, 2024
1 parent a2cff82 commit 70b7ece
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 19 deletions.
13 changes: 8 additions & 5 deletions apitally/client/client_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import random
import time
from contextlib import suppress
from functools import partial
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from uuid import UUID
Expand Down Expand Up @@ -62,11 +63,7 @@ async def _run_sync_loop(self) -> None:
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")

try:
self.request_logger.maybe_rotate_file()
except Exception: # pragma: no cover
logger.exception("An error occurred while rotating request log files")

self.request_logger.maintain()
await asyncio.sleep(1)

def stop_sync_loop(self) -> None:
Expand Down Expand Up @@ -147,6 +144,12 @@ async def _send_sync_data(self, client: httpx.AsyncClient, data: Dict[str, Any])
async def _send_log_data(self, client: httpx.AsyncClient, uuid: UUID, stream: AsyncIterator[bytes]) -> None:
logger.debug("Streaming request log data to Apitally hub")
response = await client.post(url=f"{self.hub_url}/log?uuid={uuid}", content=stream)
if response.status_code == 402 and "Retry-After" in response.headers:
with suppress(ValueError):
retry_after = int(response.headers["Retry-After"])
self.request_logger.suspend_until = time.time() + retry_after
self.request_logger.clear()
return
self._handle_hub_response(response)

def _handle_hub_response(self, response: httpx.Response) -> None:
Expand Down
13 changes: 8 additions & 5 deletions apitally/client/client_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import time
from contextlib import suppress
from functools import partial
from io import BufferedReader
from queue import Queue
Expand Down Expand Up @@ -80,11 +81,7 @@ def _run_sync_loop(self) -> None:
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")

try:
self.request_logger.maybe_rotate_file()
except Exception: # pragma: no cover
logger.exception("An error occurred while rotating request log files")

self.request_logger.maintain()
time.sleep(1)
finally:
# Send any remaining data before exiting
Expand Down Expand Up @@ -162,6 +159,12 @@ def _send_sync_data(self, session: requests.Session, data: Dict[str, Any]) -> No
def _send_log_data(self, session: requests.Session, uuid: UUID, fp: BufferedReader) -> None:
logger.debug("Streaming request log data to Apitally hub")
response = session.post(url=f"{self.hub_url}/log?uuid={uuid}", data=fp, timeout=REQUEST_TIMEOUT)
if response.status_code == 402 and "Retry-After" in response.headers:
with suppress(ValueError):
retry_after = int(response.headers["Retry-After"])
self.request_logger.suspend_until = time.time() + retry_after
self.request_logger.clear()
return
self._handle_hub_response(response)

def _handle_hub_response(self, response: requests.Response) -> None:
Expand Down
19 changes: 14 additions & 5 deletions apitally/client/request_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import tempfile
import threading
import time
from collections import deque
from contextlib import suppress
from dataclasses import dataclass, field
Expand Down Expand Up @@ -149,13 +150,14 @@ def __init__(self, config: Optional[RequestLoggingConfig]) -> None:
self.file_deque: deque[TempGzipFile] = deque([])
self.file: Optional[TempGzipFile] = None
self.lock = threading.Lock()
self.suspend_until: Optional[float] = None

@property
def current_file_size(self) -> int:
return self.file.size if self.file is not None else 0

def log_request(self, request: RequestDict, response: ResponseDict) -> None:
if not self.enabled:
if not self.enabled or self.suspend_until is not None:
return
parsed_url = urlparse(request["url"])
if self._should_exclude_path(request["path"] or parsed_url.path):
Expand Down Expand Up @@ -241,18 +243,25 @@ def rotate_file(self) -> None:
self.file_deque.append(self.file)
self.file = None

def maybe_rotate_file(self) -> None:
def maintain(self) -> None:
if self.current_file_size > MAX_FILE_SIZE:
self.rotate_file()

Check warning on line 248 in apitally/client/request_logging.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/request_logging.py#L248

Added line #L248 was not covered by tests
while len(self.file_deque) > MAX_FILES_IN_DEQUE:
file = self.file_deque.popleft()
file.delete()

Check warning on line 251 in apitally/client/request_logging.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/request_logging.py#L250-L251

Added lines #L250 - L251 were not covered by tests
if self.suspend_until is not None and self.suspend_until < time.time():
self.suspend_until = None

Check warning on line 253 in apitally/client/request_logging.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/request_logging.py#L253

Added line #L253 was not covered by tests

def close(self) -> None:
self.enabled = False
def clear(self) -> None:
self.write_deque.clear()
self.rotate_file()
for file in self.file_deque:
file.delete()

Check warning on line 259 in apitally/client/request_logging.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/request_logging.py#L259

Added line #L259 was not covered by tests
self.file_deque.clear()

def close(self) -> None:
self.enabled = False
self.clear()

@lru_cache(maxsize=1000)
def _should_exclude_path(self, url_path: str) -> bool:
Expand Down Expand Up @@ -291,7 +300,7 @@ def _has_supported_content_type(headers: List[Tuple[str, str]]) -> bool:
return content_type is not None and any(content_type.startswith(t) for t in ALLOWED_CONTENT_TYPES)


def _check_writable_fs():
def _check_writable_fs() -> bool:
try:
with tempfile.NamedTemporaryFile():
return True
Expand Down
69 changes: 67 additions & 2 deletions tests/test_client_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import asyncio
import gzip
import json
import re
import time
from typing import TYPE_CHECKING

import pytest
Expand All @@ -17,9 +20,13 @@

@pytest.fixture(scope="module")
async def client() -> ApitallyClient:
from apitally.client.client_asyncio import ApitallyClient
from apitally.client.client_asyncio import ApitallyClient, RequestLoggingConfig

client = ApitallyClient(client_id=CLIENT_ID, env=ENV)
client = ApitallyClient(
client_id=CLIENT_ID,
env=ENV,
request_logging_config=RequestLoggingConfig(enabled=True),
)
client.request_counter.add_request(
consumer=None,
method="GET",
Expand Down Expand Up @@ -56,6 +63,29 @@ async def client() -> ApitallyClient:
return client


def log_request(client: ApitallyClient) -> None:
client.request_logger.log_request(
request={
"timestamp": time.time(),
"method": "GET",
"path": "/test",
"url": "http://testserver/test",
"headers": [],
"size": 0,
"consumer": None,
"body": None,
},
response={
"status_code": 200,
"response_time": 0.105,
"headers": [],
"size": 0,
"body": None,
},
)
client.request_logger.write_to_file()


async def test_sync_loop(client: ApitallyClient, mocker: MockerFixture):
send_sync_data_mock = mocker.patch("apitally.client.client_asyncio.ApitallyClient.send_sync_data")
mocker.patch("apitally.client.client_base.INITIAL_SYNC_INTERVAL", 0.05)
Expand Down Expand Up @@ -83,6 +113,41 @@ async def test_send_sync_data(client: ApitallyClient, httpx_mock: HTTPXMock):
assert request_data["validation_errors"][0]["error_count"] == 1


async def test_send_log_data(client: ApitallyClient, httpx_mock: HTTPXMock):
from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION

log_request(client)
httpx_mock.add_response()
async with client.get_http_client() as http_client:
await client.send_log_data(client=http_client)

url_pattern = re.compile(rf"{HUB_BASE_URL}/{HUB_VERSION}/{CLIENT_ID}/{ENV}/log\?uuid=[a-f0-9-]+$")
requests = httpx_mock.get_requests(url=url_pattern)
assert len(requests) == 1
json_lines = gzip.decompress(requests[0].content).strip().split(b"\n")
assert len(json_lines) == 1
json_data = json.loads(json_lines[0])
assert json_data["request"]["path"] == "/test"
assert json_data["response"]["status_code"] == 200
httpx_mock.reset()

# Test 402 response with Retry-After header
log_request(client)
httpx_mock.add_response(status_code=402, headers={"Retry-After": "3600"})
async with client.get_http_client() as http_client:
await client.send_log_data(client=http_client)
requests = httpx_mock.get_requests(url=url_pattern)
assert len(requests) == 1
assert client.request_logger.suspend_until is not None
assert client.request_logger.suspend_until > time.time() + 3590

# Ensure not logging requests anymore
log_request(client)
assert client.request_logger.file is None
assert len(client.request_logger.write_deque) == 0
assert len(client.request_logger.file_deque) == 0


async def test_set_startup_data(client: ApitallyClient, httpx_mock: HTTPXMock):
from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION

Expand Down
63 changes: 61 additions & 2 deletions tests/test_client_threading.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import re
import time
from typing import TYPE_CHECKING

Expand All @@ -17,9 +18,13 @@

@pytest.fixture(scope="module")
def client() -> ApitallyClient:
from apitally.client.client_threading import ApitallyClient
from apitally.client.client_threading import ApitallyClient, RequestLoggingConfig

client = ApitallyClient(client_id=CLIENT_ID, env=ENV)
client = ApitallyClient(
client_id=CLIENT_ID,
env=ENV,
request_logging_config=RequestLoggingConfig(enabled=True),
)
client.request_counter.add_request(
consumer=None,
method="GET",
Expand Down Expand Up @@ -56,6 +61,29 @@ def client() -> ApitallyClient:
return client


def log_request(client: ApitallyClient) -> None:
client.request_logger.log_request(
request={
"timestamp": time.time(),
"method": "GET",
"path": "/test",
"url": "http://testserver/test",
"headers": [],
"size": 0,
"consumer": None,
"body": None,
},
response={
"status_code": 200,
"response_time": 0.105,
"headers": [],
"size": 0,
"body": None,
},
)
client.request_logger.write_to_file()


def test_sync_loop(client: ApitallyClient, mocker: MockerFixture):
send_sync_data_mock = mocker.patch("apitally.client.client_threading.ApitallyClient.send_sync_data")
mocker.patch("apitally.client.client_base.INITIAL_SYNC_INTERVAL", 0.05)
Expand All @@ -82,6 +110,37 @@ def test_send_sync_data(client: ApitallyClient, requests_mock: Mocker):
assert request_data["validation_errors"][0]["error_count"] == 1


def test_send_log_data(client: ApitallyClient, requests_mock: Mocker):
from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION

log_request(client)
url_pattern = re.compile(rf"{HUB_BASE_URL}/{HUB_VERSION}/{CLIENT_ID}/{ENV}/log\?uuid=[a-f0-9-]+$")
mock = requests_mock.register_uri("POST", url_pattern)
with requests.Session() as session:
client.send_log_data(session)

assert len(mock.request_history) == 1
# Ideally we'd also check the request body for correctness, but the following issue prevents us from doing so:
# https://github.com/jamielennox/requests-mock/issues/243
requests_mock.reset()

# Test 402 response with Retry-After header
log_request(client)
mock = requests_mock.register_uri("POST", url_pattern, status_code=402, headers={"Retry-After": "3600"})
with requests.Session() as session:
client.send_log_data(session)

assert len(mock.request_history) == 1
assert client.request_logger.suspend_until is not None
assert client.request_logger.suspend_until > time.time() + 3590

# Ensure not logging requests anymore
log_request(client)
assert client.request_logger.file is None
assert len(client.request_logger.write_deque) == 0
assert len(client.request_logger.file_deque) == 0


def test_set_startup_data(client: ApitallyClient, requests_mock: Mocker):
from apitally.client.client_base import HUB_BASE_URL, HUB_VERSION

Expand Down

0 comments on commit 70b7ece

Please sign in to comment.