diff --git a/apitally/client/request_logging.py b/apitally/client/request_logging.py index 8d03502..6ef28dd 100644 --- a/apitally/client/request_logging.py +++ b/apitally/client/request_logging.py @@ -35,6 +35,12 @@ r"/ready$", r"/live$", ] +EXCLUDE_USER_AGENT_PATTERNS = [ + r"health[_- ]?check", + r"microsoft-azure-application-lb", + r"googlehc", + r"kube-probe", +] MASK_QUERY_PARAM_PATTERNS = [ r"auth", r"api-?key", @@ -162,7 +168,12 @@ def log_request(self, request: RequestDict, response: ResponseDict) -> None: if not self.enabled or self.suspend_until is not None: return parsed_url = urlparse(request["url"]) - if self._should_exclude_path(request["path"] or parsed_url.path) or self._should_exclude(request, response): + user_agent = self._get_user_agent(request["headers"]) + if ( + self._should_exclude_path(request["path"] or parsed_url.path) + or self._should_exclude_user_agent(user_agent) + or self._should_exclude(request, response) + ): return query = self._mask_query_params(parsed_url.query) if self.config.log_query_params else "" @@ -271,6 +282,10 @@ def _should_exclude_path(self, url_path: str) -> bool: patterns = self.config.exclude_paths + EXCLUDE_PATH_PATTERNS return self._match_patterns(url_path, patterns) + @lru_cache(maxsize=1000) + def _should_exclude_user_agent(self, user_agent: Optional[str]) -> bool: + return self._match_patterns(user_agent, EXCLUDE_USER_AGENT_PATTERNS) if user_agent is not None else False + def _mask_query_params(self, query: str) -> str: query_params = parse_qsl(query) masked_query_params = [(k, v if not self._should_mask_query_param(k) else MASKED) for k, v in query_params] @@ -302,6 +317,10 @@ def _has_supported_content_type(headers: List[Tuple[str, str]]) -> bool: content_type = next((v for k, v in headers if k.lower() == "content-type"), None) return content_type is not None and any(content_type.startswith(t) for t in ALLOWED_CONTENT_TYPES) + @staticmethod + def _get_user_agent(headers: List[Tuple[str, str]]) -> Optional[str]: + return next((v for k, v in headers if k.lower() == "user-agent"), None) + def _check_writable_fs() -> bool: try: