Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request logging #71

Merged
merged 22 commits into from
Dec 2, 2024
69 changes: 54 additions & 15 deletions apitally/client/client_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import logging
import random
import time
from contextlib import suppress
from functools import partial
from typing import Any, Dict, Optional, Tuple
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from uuid import UUID

import backoff
import httpx

from apitally.client.client_base import MAX_QUEUE_TIME, REQUEST_TIMEOUT, ApitallyClientBase
from apitally.client.logging import get_logger
from apitally.client.request_logging import RequestLoggingConfig


logger = get_logger(__name__)
Expand All @@ -26,8 +29,8 @@


class ApitallyClient(ApitallyClientBase):
def __init__(self, client_id: str, env: str) -> None:
super().__init__(client_id=client_id, env=env)
def __init__(self, client_id: str, env: str, request_logging_config: Optional[RequestLoggingConfig] = None) -> None:
super().__init__(client_id=client_id, env=env, request_logging_config=request_logging_config)
self._stop_sync_loop = False
self._sync_loop_task: Optional[asyncio.Task] = None
self._sync_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue()
Expand All @@ -41,20 +44,27 @@
self._sync_loop_task = asyncio.create_task(self._run_sync_loop())

async def _run_sync_loop(self) -> None:
first_iteration = True
last_sync_time = 0.0
while not self._stop_sync_loop:
try:
time_start = time.perf_counter()
async with self.get_http_client() as client:
tasks = [self.send_sync_data(client)]
if not self._startup_data_sent and not first_iteration:
tasks.append(self.send_startup_data(client))
await asyncio.gather(*tasks)
time_elapsed = time.perf_counter() - time_start
await asyncio.sleep(self.sync_interval - time_elapsed)
self.request_logger.write_to_file()
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")
first_iteration = False
logger.exception("An error occurred while writing request logs")

now = time.time()
if (now - last_sync_time) >= self.sync_interval:
try:
async with self.get_http_client() as client:
tasks = [self.send_sync_data(client), self.send_log_data(client)]
if not self._startup_data_sent and last_sync_time > 0: # not on first sync
tasks.append(self.send_startup_data(client))

Check warning on line 60 in apitally/client/client_asyncio.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_asyncio.py#L60

Added line #L60 was not covered by tests
await asyncio.gather(*tasks)
last_sync_time = now
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")

self.request_logger.maintain()
await asyncio.sleep(1)

def stop_sync_loop(self) -> None:
self._stop_sync_loop = True
Expand All @@ -65,6 +75,7 @@
# Send any remaining data before exiting
async with self.get_http_client() as client:
await self.send_sync_data(client)
await self.send_log_data(client)

def set_startup_data(self, data: Dict[str, Any]) -> None:
self._startup_data_sent = False
Expand Down Expand Up @@ -99,10 +110,27 @@
finally:
self._sync_data_queue.task_done()

async def send_log_data(self, client: httpx.AsyncClient) -> None:
self.request_logger.rotate_file()
i = 0
while log_file := self.request_logger.get_file():
if i > 0:
time.sleep(random.uniform(0.1, 0.3))

Check warning on line 118 in apitally/client/client_asyncio.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_asyncio.py#L118

Added line #L118 was not covered by tests
try:
stream = log_file.stream_lines_compressed()
await self._send_log_data(client, log_file.uuid, stream)
log_file.delete()
except httpx.HTTPError:
self.request_logger.retry_file_later(log_file)
break

Check warning on line 125 in apitally/client/client_asyncio.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_asyncio.py#L123-L125

Added lines #L123 - L125 were not covered by tests
i += 1
if i >= 10:
break

Check warning on line 128 in apitally/client/client_asyncio.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_asyncio.py#L128

Added line #L128 was not covered by tests

@retry(raise_on_giveup=False)
async def _send_startup_data(self, client: httpx.AsyncClient, data: Dict[str, Any]) -> None:
logger.debug("Sending startup data to Apitally hub")
response = await client.post(url="/startup", json=data, timeout=REQUEST_TIMEOUT)
response = await client.post(url="/startup", json=data)
self._handle_hub_response(response)
self._startup_data_sent = True
self._startup_data = None
Expand All @@ -113,6 +141,17 @@
response = await client.post(url="/sync", json=data)
self._handle_hub_response(response)

async def _send_log_data(self, client: httpx.AsyncClient, uuid: UUID, stream: AsyncIterator[bytes]) -> None:
logger.debug("Streaming request log data to Apitally hub")
response = await client.post(url=f"{self.hub_url}/log?uuid={uuid}", content=stream)
if response.status_code == 402 and "Retry-After" in response.headers:
with suppress(ValueError):
retry_after = int(response.headers["Retry-After"])
self.request_logger.suspend_until = time.time() + retry_after
self.request_logger.clear()
return
self._handle_hub_response(response)

def _handle_hub_response(self, response: httpx.Response) -> None:
if response.status_code == 404:
self.stop_sync_loop()
Expand Down
4 changes: 3 additions & 1 deletion apitally/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from apitally.client.consumers import ConsumerRegistry
from apitally.client.logging import get_logger
from apitally.client.request_logging import RequestLogger, RequestLoggingConfig
from apitally.client.requests import RequestCounter
from apitally.client.server_errors import ServerErrorCounter
from apitally.client.validation_errors import ValidationErrorCounter
Expand Down Expand Up @@ -39,7 +40,7 @@ def __new__(cls: Type[TApitallyClient], *args, **kwargs) -> TApitallyClient:
cls._instance = super().__new__(cls)
return cast(TApitallyClient, cls._instance)

def __init__(self, client_id: str, env: str) -> None:
def __init__(self, client_id: str, env: str, request_logging_config: Optional[RequestLoggingConfig] = None) -> None:
if hasattr(self, "client_id"):
raise RuntimeError("Apitally client is already initialized") # pragma: no cover
try:
Expand All @@ -56,6 +57,7 @@ def __init__(self, client_id: str, env: str) -> None:
self.validation_error_counter = ValidationErrorCounter()
self.server_error_counter = ServerErrorCounter()
self.consumer_registry = ConsumerRegistry()
self.request_logger = RequestLogger(request_logging_config)

self._startup_data: Optional[Dict[str, Any]] = None
self._startup_data_sent = False
Expand Down
59 changes: 50 additions & 9 deletions apitally/client/client_threading.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from __future__ import annotations

import logging
import queue
import random
import time
from contextlib import suppress
from functools import partial
from io import BufferedReader
from queue import Queue
from threading import Event, Thread
from typing import Any, Callable, Dict, Optional, Tuple
from uuid import UUID

import backoff
import requests

from apitally.client.client_base import MAX_QUEUE_TIME, REQUEST_TIMEOUT, ApitallyClientBase
from apitally.client.logging import get_logger
from apitally.client.request_logging import RequestLoggingConfig


logger = get_logger(__name__)
Expand Down Expand Up @@ -43,11 +47,11 @@


class ApitallyClient(ApitallyClientBase):
def __init__(self, client_id: str, env: str) -> None:
super().__init__(client_id=client_id, env=env)
def __init__(self, client_id: str, env: str, request_logging_config: Optional[RequestLoggingConfig] = None) -> None:
super().__init__(client_id=client_id, env=env, request_logging_config=request_logging_config)
self._thread: Optional[Thread] = None
self._stop_sync_loop = Event()
self._sync_data_queue: queue.Queue[Tuple[float, Dict[str, Any]]] = queue.Queue()
self._sync_data_queue: Queue[Tuple[float, Dict[str, Any]]] = Queue()

def start_sync_loop(self) -> None:
self._stop_sync_loop.clear()
Expand All @@ -61,20 +65,29 @@
last_sync_time = 0.0
while not self._stop_sync_loop.is_set():
try:
now = time.time()
if (now - last_sync_time) >= self.sync_interval:
self.request_logger.write_to_file()
except Exception: # pragma: no cover
logger.exception("An error occurred while writing request logs")

now = time.time()
if (now - last_sync_time) >= self.sync_interval:
try:
with requests.Session() as session:
if not self._startup_data_sent and last_sync_time > 0: # not on first sync
self.send_startup_data(session)
self.send_sync_data(session)
self.send_log_data(session)
last_sync_time = now
time.sleep(1)
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")

self.request_logger.maintain()
time.sleep(1)
finally:
# Send any remaining data before exiting
with requests.Session() as session:
self.send_sync_data(session)
self.send_log_data(session)

def stop_sync_loop(self) -> None:
self._stop_sync_loop.set()
Expand Down Expand Up @@ -112,6 +125,23 @@
finally:
self._sync_data_queue.task_done()

def send_log_data(self, session: requests.Session) -> None:
self.request_logger.rotate_file()
i = 0
while log_file := self.request_logger.get_file():
if i > 0:
time.sleep(random.uniform(0.1, 0.3))

Check warning on line 133 in apitally/client/client_threading.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_threading.py#L133

Added line #L133 was not covered by tests
try:
with log_file.open_compressed() as fp:
self._send_log_data(session, log_file.uuid, fp)
log_file.delete()
except requests.RequestException:
self.request_logger.retry_file_later(log_file)
break

Check warning on line 140 in apitally/client/client_threading.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_threading.py#L138-L140

Added lines #L138 - L140 were not covered by tests
i += 1
if i >= 10:
break

Check warning on line 143 in apitally/client/client_threading.py

View check run for this annotation

Codecov / codecov/patch

apitally/client/client_threading.py#L143

Added line #L143 was not covered by tests

@retry(raise_on_giveup=False)
def _send_startup_data(self, session: requests.Session, data: Dict[str, Any]) -> None:
logger.debug("Sending startup data to Apitally hub")
Expand All @@ -126,6 +156,17 @@
response = session.post(url=f"{self.hub_url}/sync", json=data, timeout=REQUEST_TIMEOUT)
self._handle_hub_response(response)

def _send_log_data(self, session: requests.Session, uuid: UUID, fp: BufferedReader) -> None:
logger.debug("Streaming request log data to Apitally hub")
response = session.post(url=f"{self.hub_url}/log?uuid={uuid}", data=fp, timeout=REQUEST_TIMEOUT)
if response.status_code == 402 and "Retry-After" in response.headers:
with suppress(ValueError):
retry_after = int(response.headers["Retry-After"])
self.request_logger.suspend_until = time.time() + retry_after
self.request_logger.clear()
return
self._handle_hub_response(response)

def _handle_hub_response(self, response: requests.Response) -> None:
if response.status_code == 404:
self.stop_sync_loop()
Expand Down
Loading