Skip to content

Commit

Permalink
Remove dead batteries in core.utils
Browse files Browse the repository at this point in the history
  • Loading branch information
vaamb committed Nov 23, 2024
1 parent bde5d0f commit 2012aae
Showing 1 changed file with 16 additions and 119 deletions.
135 changes: 16 additions & 119 deletions src/ouranos/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,38 @@
from __future__ import annotations

import asyncio
import base64
import dataclasses
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta, timezone
from datetime import datetime, timedelta, timezone
import json as _json
import typing as t
from typing import Any
import uuid
import warnings

from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import jwt
import orjson
from sqlalchemy import Row

from ouranos.core.exceptions import (
ExpiredTokenError, InvalidTokenError, TokenError)

try:
import orjson
except ImportError:
warnings.warn("Ouranos could be faster if orjson was installed")

def _serializer(self, o: Any) -> dict | str:
if isinstance(o, datetime):
return o.astimezone(tz=timezone.utc).isoformat(timespec="seconds")
if isinstance(o, date):
return o.isoformat()
if isinstance(o, time):
return (
datetime.combine(date.today(), o).astimezone(tz=timezone.utc)
.isoformat(timespec="seconds")
)
if isinstance(o, uuid.UUID):
return str(o)
if isinstance(o, Row):
return o.tuple() # return a tuple
# return {**o._mapping} # return a dict
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
if hasattr(o, "__html__"):
return str(o.__html__())
return _json.JSONEncoder.default(self, o)

class json:
@staticmethod
def dumps(obj) -> bytes:
return _json.dumps(obj, default=_serializer).encode("utf8")

@staticmethod
def loads(obj) -> t.Any:
return _json.loads(obj)
def _serializer(self, o: Any) -> dict | str:
if isinstance(o, Row):
return o.tuple() # return a tuple
# return {**o._mapping} # return a dict
if hasattr(o, "__html__"):
return str(o.__html__())
return _json.JSONEncoder.default(self, o)

else:
def _serializer(self, o: Any) -> dict | str:
if isinstance(o, Row):
return o.tuple() # return a tuple
# return {**o._mapping} # return a dict
if hasattr(o, "__html__"):
return str(o.__html__())
return _json.JSONEncoder.default(self, o)

class json:
@staticmethod
def dumps(obj) -> bytes:
return orjson.dumps(obj, default=_serializer)
class json:
@staticmethod
def dumps(obj) -> bytes:
return orjson.dumps(obj, default=_serializer)

@staticmethod
def loads(obj) -> t.Any:
return orjson.loads(obj)
@staticmethod
def loads(obj) -> t.Any:
return orjson.loads(obj)


def setup_loop():
Expand Down Expand Up @@ -215,70 +176,6 @@ def create_token(
return Tokenizer.dumps(payload)


def time_to_datetime(_time: time | None) -> datetime | None:
# return _time in case it is None
if not isinstance(_time, time):
return _time
return datetime.combine(date.today(), _time, tzinfo=timezone.utc)


def try_iso_format(time_obj: time | None) -> str | None:
if time_obj is not None:
return time_to_datetime(time_obj).isoformat()
return None


def decrypt_uid(encrypted_uid: str, secret_key: str = None) -> str:
from ouranos import current_app
secret_key = secret_key or current_app.config["OURANOS_CONNECTION_KEY"]
if not secret_key:
raise RuntimeError(
"Either provide a `secret_key` or setup `CONNECTION_KEY` in config "
"file or `OURANOS_CONNECTION_KEY` in the environment"
)
h = hashes.Hash(hashes.SHA256())
h.update(secret_key.encode("utf-8"))
key = base64.urlsafe_b64encode(h.finalize())
f = Fernet(key=key)
try:
return f.decrypt(encrypted_uid.encode("utf-8")).decode("utf-8")
except InvalidToken:
return ""


def validate_uid_token(token: str, manager_uid: str) -> bool:
iterations = int(token.split("$")[0].split(":")[2])
ssalt = token.split("$")[1]
token_key = token.split("$")[2]
bsalt = ssalt.encode("UTF-8")

kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=bsalt,
iterations=iterations,
)
bkey = kdf.derive(manager_uid.encode())
hkey = base64.b64encode(bkey).hex()
return token_key == hkey


def generate_secret_key_from_password(password: str | bytes) -> str:
if isinstance(password, str):
pwd = password.encode("utf-8")
else:
pwd = password
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b"",
iterations=2**21,
)
bkey = kdf.derive(pwd)
skey = base64.b64encode(bkey).decode("utf-8").strip("=")
return skey


def stripped_warning(msg):
def custom_format_warning(_msg, *args, **kwargs):
return str(_msg) + '\n'
Expand Down

0 comments on commit 2012aae

Please sign in to comment.