diff --git a/src/sentry/auth/authenticators/u2f.py b/src/sentry/auth/authenticators/u2f.py index 79ba84e20c4a9b..fb3f24240ab5bf 100644 --- a/src/sentry/auth/authenticators/u2f.py +++ b/src/sentry/auth/authenticators/u2f.py @@ -1,6 +1,6 @@ import logging from base64 import urlsafe_b64encode -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta from functools import cached_property from time import time from urllib.parse import urlparse @@ -19,6 +19,7 @@ from sentry import options from sentry.auth.authenticators.base import EnrollmentStatus +from sentry.silo.base import SiloMode from sentry.utils import json from sentry.utils.dates import to_datetime from sentry.utils.decorators import classproperty @@ -56,24 +57,24 @@ def _valid_staff_timestamp(request, limit: timedelta = STAFF_AUTH_FLOW_MAX_AGE) if not timestamp: return False - flag_datetime = datetime.fromtimestamp(timestamp, timezone.utc) - current_time = datetime.now(timezone.utc) - time_difference = current_time - flag_datetime + flag_datetime = datetime.fromtimestamp(timestamp, UTC) + current_time = datetime.now(UTC) + time_difference = flag_datetime - current_time logger.info( "Validating staff timestamp", extra={ "user": request.user.id, "current_time": current_time, "flag_datetime": flag_datetime, - "time_difference": current_time - flag_datetime, - "limit": limit, - "boolean_check": time_difference > limit, + "time_difference": flag_datetime - current_time, + "boolean_check": timedelta(0) < time_difference < limit, + "active_silo": SiloMode.get_current_mode(), }, ) - if time_difference > limit: - return False - return True + # For a valid timestamp, the time difference must be positive (timestamp is in the future) + # and less than the limit (timestamp is within the valid limit, e.g. within the last 2 minutes) + return timedelta(0) < time_difference < limit class U2fInterface(AuthenticatorInterface): @@ -243,10 +244,11 @@ def activate(self, request: HttpRequest) -> ActivationChallengeResult: extra={ "user": request.user.id, "staff_flag": ( - datetime.utcfromtimestamp(request.session["staff_auth_flow"]) + datetime.fromtimestamp(request.session["staff_auth_flow"], UTC) if "staff_auth_flow" in request.session else "missing" ), + "active_silo": SiloMode.get_current_mode(), }, ) # It is an intentional decision to not check whether or not the staff @@ -263,12 +265,13 @@ def activate(self, request: HttpRequest) -> ActivationChallengeResult: extra={ "user": request.user.id, "staff_flag": ( - datetime.utcfromtimestamp(request.session["staff_auth_flow"]) + datetime.fromtimestamp(request.session["staff_auth_flow"], UTC) if "staff_auth_flow" in request.session else "missing" ), "has_state": "webauthn_authentication_state" in request.session, "has_staff_state": "staff_webauthn_authentication_state" in request.session, + "active_silo": SiloMode.get_current_mode(), }, ) return ActivationChallengeResult(challenge=cbor.encode(challenge["publicKey"])) @@ -282,12 +285,13 @@ def validate_response(self, request: HttpRequest, challenge, response): extra={ "user": request.user.id, "staff_flag": ( - datetime.utcfromtimestamp(request.session["staff_auth_flow"]) + datetime.fromtimestamp(request.session["staff_auth_flow"], UTC) if "staff_auth_flow" in request.session else "missing" ), "has_state": "webauthn_authentication_state" in request.session, "has_staff_state": "staff_webauthn_authentication_state" in request.session, + "active_silo": SiloMode.get_current_mode(), }, ) if _valid_staff_timestamp(request): @@ -298,7 +302,10 @@ def validate_response(self, request: HttpRequest, challenge, response): "webauthn_authentication_state" ): logger.info( - "Both staff and non-staff U2F states are set", extra={"user": request.user.id} + "Both staff and non-staff U2F states are set", + extra={ + "user": request.user.id, + }, ) self.webauthn_authentication_server.authenticate_complete( state=state, diff --git a/tests/sentry/auth/authenticators/test_u2f.py b/tests/sentry/auth/authenticators/test_u2f.py index c60732d2f1a22b..dd5067c90713f7 100644 --- a/tests/sentry/auth/authenticators/test_u2f.py +++ b/tests/sentry/auth/authenticators/test_u2f.py @@ -17,8 +17,9 @@ @control_silo_test class U2FInterfaceTest(TestCase): CURRENT_TIME = datetime(2024, 3, 11, 0, 0) - VALID_TIMESTAMP = (CURRENT_TIME - timedelta(minutes=1)).timestamp() - INVALID_TIMESTAMP = (CURRENT_TIME - timedelta(minutes=3)).timestamp() + VALID_TIMESTAMP = (CURRENT_TIME + timedelta(minutes=1)).timestamp() + INVALID_EXPIRED_TIMESTAMP = (CURRENT_TIME - timedelta(minutes=3)).timestamp() + INVALID_FUTURE_TIMESTAMP = (CURRENT_TIME + timedelta(minutes=3)).timestamp() def setUp(self): self.u2f = U2fInterface() @@ -98,7 +99,7 @@ def test_activate_staff_webauthn_valid_timestamp(self): def test_activate_staff_webauthn_invalid_timestamp(self): self.test_try_enroll_webauthn() - self.request.session["staff_auth_flow"] = self.INVALID_TIMESTAMP + self.request.session["staff_auth_flow"] = self.INVALID_EXPIRED_TIMESTAMP result = self.u2f.activate(self.request) @@ -141,14 +142,23 @@ def test_validate_response_staff_state_invalid_timestamp(self): mock_state = Mock() self.u2f.webauthn_authentication_server.authenticate_complete = mock_state + # Test expired timestamp self.request.session["webauthn_authentication_state"] = "non-staff state" - self.request.session["staff_auth_flow"] = self.INVALID_TIMESTAMP + self.request.session["staff_auth_flow"] = self.INVALID_EXPIRED_TIMESTAMP assert self.u2f.validate_response(self.request, None, self.response) _, kwargs = mock_state.call_args assert kwargs.get("state") == "non-staff state" assert "webauthn_authentication_state" not in self.request.session + # Test timestamp too far in the future + self.request.session["webauthn_authentication_state"] = "non-staff state" + self.request.session["staff_auth_flow"] = self.INVALID_FUTURE_TIMESTAMP + assert self.u2f.validate_response(self.request, None, self.response) + _, kwargs = mock_state.call_args + assert kwargs.get("state") == "non-staff state" + assert "webauthn_authentication_state" not in self.request.session + @freeze_time(CURRENT_TIME) def test_validate_response_failing_still_clears_all_states(self): self.test_try_enroll_webauthn()