Skip to content

Commit

Permalink
add support for bearer token and loading token from filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfv committed Jan 2, 2024
1 parent 3d4560f commit ad83c73
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
48 changes: 46 additions & 2 deletions src/quetz_client/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,50 @@
import os
from urllib.parse import urlparse
import json
from typing import Optional, cast

import fire
from requests.adapters import HTTPAdapter, Retry

from quetz_client.client import QuetzClient

class BearerToken:
def __init__(self, token: str):
self.token = token

def __str__(self):
return self.token

class ApiKey:
def __init__(self, token: str):
self.token = token

def __str__(self):
return self.token

def get_auth_token(url: str, token: str) -> str:
token = cast(str, token or os.getenv("QUETZ_API_KEY", ""))
if token:
return ApiKey(token)
elif os.getenv("QUETZ_BEARER_TOKEN"):
return BearerToken(os.getenv("QUETZ_BEARER_TOKEN"))

# open the authentication file and read the token
with open(os.path.expanduser("~/.mamba/auth/authentication.json")) as f:
auth = json.load(f)

# check if the host is present in the authentication file
host = urlparse(url).netloc
if host not in auth:
return None

if host in auth:
credentials = auth[host]

if credentials["type"] == "BearerToken":
return BearerToken(credentials["token"])
elif credentials["type"] == "ApiKey":
return ApiKey(credentials["token"])

def get_client(
*,
Expand Down Expand Up @@ -37,8 +76,13 @@ def get_client(
# Initialize the client (do not force the env variables to be set of help on the
# subcommands does not work without setting them)
url = cast(str, url or os.getenv("QUETZ_SERVER_URL", ""))
token = cast(str, token or os.getenv("QUETZ_API_KEY", ""))
client = QuetzClient.from_token(url, token)

token = get_auth_token(url, token)

if isinstance(token, ApiKey):
client = QuetzClient.from_token(url, str(token))
else:
client = QuetzClient.from_bearer_token(url, str(token))

# Configure the client with additional flags passed to the CLI
client.session.verify = not insecure
Expand Down
6 changes: 6 additions & 0 deletions src/quetz_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def from_token(cls, url: str, token: str) -> "QuetzClient":
session = requests.Session()
session.headers.update({"X-API-Key": token})
return cls(session, url=url)

@classmethod
def from_bearer_token(cls, url: str, token: str) -> "QuetzClient":
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {token}"})
return cls(session, url=url)

def _yield_paginated(
self, url: str, params: Dict[str, Union[str, int]], limit: int = 20
Expand Down

0 comments on commit ad83c73

Please sign in to comment.