Skip to content

Commit

Permalink
Fix kubeconfig with no clusters (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson authored Apr 24, 2024
1 parent 0c86f3e commit 6dba956
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 17 deletions.
10 changes: 8 additions & 2 deletions kr8s/_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ async def _load_kubeconfig(self) -> None:
self._context = self.kubeconfig.contexts[0]["context"]
self.active_context = self.kubeconfig.contexts[0]["name"]

# Load configuration options from the context
if self.namespace is None:
self.namespace = self.kubeconfig.current_namespace

# If no cluster is found in the context, assume it's a service account
if not self._context["cluster"]:
return

self._cluster = self.kubeconfig.get_cluster(self._context["cluster"])
self._user = self.kubeconfig.get_user(self._context["user"])
self.server = self._cluster["server"]
Expand Down Expand Up @@ -205,8 +213,6 @@ async def _load_kubeconfig(self) -> None:
"username/password authentication was removed in Kubernetes 1.19, "
"kr8s doesn't not support this Kubernetes version"
)
if self.namespace is None:
self.namespace = self.kubeconfig.current_namespace
if "auth-provider" in self._user:
if p := self._user["auth-provider"]["name"] != "oidc":
raise ValueError(
Expand Down
53 changes: 38 additions & 15 deletions kr8s/_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
from typing import Dict, List
from typing import Dict, List, Union

import anyio
import yaml
Expand All @@ -18,8 +18,11 @@


class KubeConfigSet(object):
def __init__(self, *paths):
self._configs = [KubeConfig(path) for path in paths]
def __init__(self, *paths_or_dicts: Union[List[str], List[Dict]]):
if isinstance(paths_or_dicts[0], str):
self._configs = [KubeConfig(path) for path in paths_or_dicts]
else:
self._configs = [KubeConfig(config) for config in paths_or_dicts]

def __await__(self):
async def f():
Expand Down Expand Up @@ -130,52 +133,72 @@ def preferences(self) -> Dict:

@property
def clusters(self) -> List[Dict]:
clusters = [cluster for config in self._configs for cluster in config.clusters]
clusters = []
for config in self._configs:
if config.clusters:
clusters.extend(config.clusters)
# Unpack and repack to remove duplicates
clusters = list_dict_unpack(clusters, "name", "cluster")
clusters = dict_list_pack(clusters, "name", "cluster")
return clusters

@property
def users(self) -> List[Dict]:
users = [user for config in self._configs for user in config.users]
users = []
for config in self._configs:
if config.users:
users.extend(config.users)
# Unpack and repack to remove duplicates
users = list_dict_unpack(users, "name", "user")
users = dict_list_pack(users, "name", "user")
return users

@property
def contexts(self) -> List[Dict]:
contexts = [context for config in self._configs for context in config.contexts]
contexts = []
for config in self._configs:
if config.contexts:
contexts.extend(config.contexts)
# Unpack and repack to remove duplicates
contexts = list_dict_unpack(contexts, "name", "context")
contexts = dict_list_pack(contexts, "name", "context")
return contexts

@property
def extensions(self) -> List[Dict]:
return [
extension for config in self._configs for extension in config.extensions
]
extensions = []
for config in self._configs:
if config.extensions:
extensions.extend(config.extensions)
return extensions


class KubeConfig(object):
def __init__(self, path):
self.path = path
def __init__(self, path_or_config: Union[str, Dict]):
self.path = None
self._raw = None
if isinstance(path_or_config, str):
self.path = path_or_config
else:
self._raw = path_or_config

self.__write_lock = anyio.Lock()

def __await__(self):
async def f():
async with await anyio.open_file(self.path) as fh:
self._raw = yaml.safe_load(await fh.read())
if not self._raw:
async with await anyio.open_file(self.path) as fh:
self._raw = yaml.safe_load(await fh.read())
return self

return f().__await__()

async def save(self) -> None:
async def save(self, path=None) -> None:
path = self.path if not path else path
if not path:
raise ValueError("No path provided")
async with self.__write_lock:
async with await anyio.open_file(self.path, "w") as fh:
async with await anyio.open_file(path, "w") as fh:
await fh.write(yaml.safe_dump(self._raw))

@property
Expand Down
28 changes: 28 additions & 0 deletions kr8s/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import yaml

import kr8s
from kr8s._config import KubeConfig
from kr8s._testutils import set_env

HERE = Path(__file__).parent.resolve()
Expand Down Expand Up @@ -166,6 +167,33 @@ async def test_service_account(serviceaccount):
assert api.auth.namespace == (serviceaccount / "namespace").read_text()


async def test_service_account_with_kubeconfig_namespace(serviceaccount):
kubeconfig = await KubeConfig(
{
"apiVersion": "v1",
"clusters": None,
"contexts": [
{
"context": {"cluster": "", "namespace": "bar", "user": ""},
"name": "foo",
}
],
"current-context": "foo",
"kind": "Config",
"preferences": {},
"users": None,
}
)
kubeconfig_path = str(Path(serviceaccount) / "kubeconfig")
await kubeconfig.save(path=kubeconfig_path)
api = await kr8s.asyncio.api(
serviceaccount=serviceaccount, kubeconfig=kubeconfig_path
)

assert api.auth.server
assert api.namespace == "bar"


async def test_exec(kubeconfig_with_exec):
api = await kr8s.asyncio.api(kubeconfig=kubeconfig_with_exec)
assert await api.get("pods", namespace=kr8s.ALL)
Expand Down
9 changes: 9 additions & 0 deletions kr8s/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from tempfile import NamedTemporaryFile

import pytest
import yaml

from kr8s._config import KubeConfig, KubeConfigSet

Expand Down Expand Up @@ -43,6 +44,14 @@ async def test_load_kubeconfig_set(temp_kubeconfig):
assert len(configs.contexts) > 0


@pytest.mark.parametrize("cls", [KubeConfig, KubeConfigSet])
async def test_kubeconfig_from_dict(temp_kubeconfig, cls):
with open(temp_kubeconfig) as fh:
config = yaml.safe_load(fh)
kubeconfig = await cls(config)
assert kubeconfig.raw == config


@pytest.mark.parametrize("cls", [KubeConfig, KubeConfigSet])
async def test_rename_context(temp_kubeconfig, cls):
config = await cls(temp_kubeconfig)
Expand Down

0 comments on commit 6dba956

Please sign in to comment.