diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1d221b1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,62 @@ +[tool.ruff] +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", +] + +# Same as Black. +line-length = 150 +indent-width = 4 + +# Assume Python 3.7 +target-version = "py37" + +[tool.ruff.lint] +# Enable Pyflakes (`F`) and pycodestyle (`E`) codes by default. +select = ["E", "F"] +ignore = [] + +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[tool.ruff.format] +# Like Black, use double quotes for strings. +quote-style = "double" + +# Like Black, indent with spaces, rather than tabs. +indent-style = "space" + +# Like Black, respect magic trailing commas. +skip-magic-trailing-comma = false + +# Like Black, automatically detect the appropriate line ending. +line-ending = "auto" \ No newline at end of file diff --git a/python-clusters/attach-eks-cluster/cluster.py b/python-clusters/attach-eks-cluster/cluster.py index dbf8bdf..03e1006 100644 --- a/python-clusters/attach-eks-cluster/cluster.py +++ b/python-clusters/attach-eks-cluster/cluster.py @@ -1,4 +1,6 @@ -import os, sys, json, subprocess, time, logging, yaml +import os +import json +import yaml import dku_utils.tools_version from dataiku.cluster import Cluster @@ -9,6 +11,7 @@ from dku_utils.config_parser import get_region_arg from dku_utils.tools_version import get_kubectl_version, kubectl_should_use_beta_apiVersion + class MyCluster(Cluster): def __init__(self, cluster_id, cluster_name, config, plugin_config, global_settings): self.cluster_id = cluster_id @@ -19,24 +22,24 @@ def __init__(self, cluster_id, cluster_name, config, plugin_config, global_setti def start(self): dku_utils.tools_version.check_versions() - cluster_id = self.config['clusterId'] - + cluster_id = self.config["clusterId"] + # retrieve the cluster info from EKS # this will fail if the cluster doesn't exist, but the API message is enough connection_info = get_connection_info(self.config) - - args = ['get', 'cluster'] - args = args + ['--name', cluster_id] + + args = ["get", "cluster"] + args = args + ["--name", cluster_id] args = args + get_region_arg(connection_info) - args = args + ['-o', 'json'] + args = args + ["-o", "json"] c = EksctlCommand(args, connection_info) cluster_info = json.loads(c.run_and_get_output())[0] kubectl_version = get_kubectl_version() - apiVersion_to_use = 'v1beta1' if kubectl_should_use_beta_apiVersion(kubectl_version) else 'v1alpha1' + apiVersion_to_use = "v1beta1" if kubectl_should_use_beta_apiVersion(kubectl_version) else "v1alpha1" kube_config_str = """ apiVersion: v1 @@ -64,14 +67,15 @@ def start(self): - %s command: aws-iam-authenticator env: null - """ % (cluster_info['CertificateAuthority']['Data'], cluster_info['Endpoint'], apiVersion_to_use, cluster_id) - kube_config_str = kube_config_str.replace("__CLUSTER_ID__", cluster_id) # cluster_id is as good as anything, since this kubeconfig won't be merged into another one + """ % (cluster_info["CertificateAuthority"]["Data"], cluster_info["Endpoint"], apiVersion_to_use, cluster_id) + # cluster_id is as good as anything, since this kubeconfig won't be merged into another one + kube_config_str = kube_config_str.replace("__CLUSTER_ID__", cluster_id) # build the config file for kubectl # we don't add the context to the main config file, to not end up with an oversized config, # and because 2 different clusters could be concurrently editing the config file - kube_config_path = os.path.join(os.getcwd(), 'kube_config') - with open(kube_config_path, 'w') as f: + kube_config_path = os.path.join(os.getcwd(), "kube_config") + with open(kube_config_path, "w") as f: f.write(kube_config_str) setup_creds_env(kube_config_path, connection_info, self.config) @@ -80,7 +84,7 @@ def start(self): # collect and prepare the overrides so that DSS can know where and how to use the cluster overrides = make_overrides(self.config, kube_config, kube_config_path) - return [overrides, {'kube_config_path':kube_config_path, 'cluster':cluster_info}] + return [overrides, {"kube_config_path": kube_config_path, "cluster": cluster_info}] def stop(self, data): pass diff --git a/python-clusters/create-eks-cluster/cluster.py b/python-clusters/create-eks-cluster/cluster.py index ecf2f61..5fc78fd 100644 --- a/python-clusters/create-eks-cluster/cluster.py +++ b/python-clusters/create-eks-cluster/cluster.py @@ -1,4 +1,9 @@ -import os, sys, json, subprocess, time, logging, yaml, threading +import os +import json +import time +import logging +import yaml +import threading import dku_utils.tools_version from dataiku.cluster import Cluster @@ -15,6 +20,7 @@ from dku_utils.node_pool import get_node_pool_yaml from dku_utils.taints import Taint + class MyCluster(Cluster): def __init__(self, cluster_id, cluster_name, config, plugin_config, global_settings): self.cluster_id = cluster_id @@ -29,92 +35,92 @@ def start(self): networking_settings = self.config["networkingSettings"] has_autoscaling = False - + attach_vm_to_security_groups = False - injected_security_group = self.config.get('injectedSG', '').strip() + injected_security_group = self.config.get("injectedSG", "").strip() k8s_version = self.config.get("k8sVersion", None) autoscaled_node_pools_taints = None gpu_node_pools_taints = None - - if self.config.get('advanced', False): - has_autoscaling = self.config.get('clusterAutoScaling') + + if self.config.get("advanced", False): + has_autoscaling = self.config.get("clusterAutoScaling") has_gpu = self.config.get("advancedGPU") # create the cluster directly from a yaml def yaml_dict = yaml.safe_load(self.config.get("advancedYaml")) else: - node_pools = self.config.get('nodePools', []) - node_pool = self.config.get('nodePool', {}) + node_pools = self.config.get("nodePools", []) + node_pool = self.config.get("nodePool", {}) gpu_node_pools_taints = set() autoscaled_node_pools_taints = set() if node_pool: node_pools.append(node_pool) - has_autoscaling = any(node_pool.get('numNodesAutoscaling', False) for node_pool in node_pools) - has_gpu = any(node_pool.get('enableGPU', False) for node_pool in node_pools) + has_autoscaling = any(node_pool.get("numNodesAutoscaling", False) for node_pool in node_pools) + has_gpu = any(node_pool.get("enableGPU", False) for node_pool in node_pools) # build the yaml def. As a first step we run eksctl with # as many command line args as possible to get it to produce # a good base for the cluster yaml def, then we spice it up - # according to the settings that don't have a command-line + # according to the settings that don't have a command-line # arg - args = ['create', 'cluster'] - args = args + ['-v', '3'] # not -v 4 otherwise there is a debug line in the beginning of the output - args = args + ['--name', self.cluster_id] + args = ["create", "cluster"] + args = args + ["-v", "3"] # not -v 4 otherwise there is a debug line in the beginning of the output + args = args + ["--name", self.cluster_id] args = args + get_region_arg(connection_info) - args = args + ['--full-ecr-access'] + args = args + ["--full-ecr-access"] - subnets = list(map(lambda subnet_id: subnet_id.strip(), networking_settings.get('subnets', []))) - if networking_settings.get('privateNetworking', False): - private_subnets = list(map(lambda private_subnet_id: private_subnet_id.strip(), networking_settings.get('privateSubnets', []))) + subnets = list(map(lambda subnet_id: subnet_id.strip(), networking_settings.get("subnets", []))) + if networking_settings.get("privateNetworking", False): + private_subnets = list(map(lambda private_subnet_id: private_subnet_id.strip(), networking_settings.get("privateSubnets", []))) if len(private_subnets) > 0: - args = args + ['--vpc-private-subnets', ','.join(private_subnets)] + args = args + ["--vpc-private-subnets", ",".join(private_subnets)] if len(subnets) > 0: - args = args + ['--vpc-public-subnets', ','.join(subnets)] + args = args + ["--vpc-public-subnets", ",".join(subnets)] # EKSCTL does not support creating more than one node group using CLI arguments # So we generate the configuration for the cluster without node groups and we add them later to the yaml config - args += ['--without-nodegroup'] + args += ["--without-nodegroup"] if not _is_none_or_blank(k8s_version): - args = args + ['--version', k8s_version.strip()] + args = args + ["--version", k8s_version.strip()] c = EksctlCommand(args + ["--dry-run"], connection_info) yaml_spec = c.run_and_get_output() logging.info("Got spec:\n%s" % yaml_spec) - + yaml_dict = yaml.safe_load(yaml_spec) # Once we generated the yaml configuration for the cluster, we can add the required specs for each node group # and do a second dry-run with the initial generated configuration file. if node_pools: - yaml_dict['managedNodeGroups'] = yaml_dict.get('managedNodeGroups', []) + yaml_dict["managedNodeGroups"] = yaml_dict.get("managedNodeGroups", []) for idx, node_pool in enumerate(node_pools, 0): if node_pool: yaml_node_pool = get_node_pool_yaml(node_pool, networking_settings) - yaml_node_pool['name'] = node_pool.get('nodeGroupId', "%s-ng-%s" % (self.cluster_id, idx)) - yaml_dict['managedNodeGroups'].append(yaml_node_pool) + yaml_node_pool["name"] = node_pool.get("nodeGroupId", "%s-ng-%s" % (self.cluster_id, idx)) + yaml_dict["managedNodeGroups"].append(yaml_node_pool) # Keep track of all the GPU enabled or autoscaled node pool taints (without duplicates) - if node_pool.get('enableGPU', False) or node_pool.get('numNodesAutoscaling'): - current_node_pool_taints = yaml_node_pool.get('taints', []) + if node_pool.get("enableGPU", False) or node_pool.get("numNodesAutoscaling"): + current_node_pool_taints = yaml_node_pool.get("taints", []) for taint in current_node_pool_taints: new_taint = Taint(taint) - if node_pool.get('enableGPU', False): + if node_pool.get("enableGPU", False): gpu_node_pools_taints.add(new_taint) else: autoscaled_node_pools_taints.add(new_taint) - yaml_node_pool_loc = os.path.join(os.getcwd(), self.cluster_id +'_config_with_node_pools.yaml') - with open(yaml_node_pool_loc, 'w') as outfile: + yaml_node_pool_loc = os.path.join(os.getcwd(), self.cluster_id + "_config_with_node_pools.yaml") + with open(yaml_node_pool_loc, "w") as outfile: yaml.dump(yaml_dict, outfile, default_flow_style=False) - args = ['create', 'cluster'] - args += ['-v', '3'] # not -v 4 otherwise there is a debug line in the beginning of the output - args += ['-f', yaml_node_pool_loc] + args = ["create", "cluster"] + args += ["-v", "3"] # not -v 4 otherwise there is a debug line in the beginning of the output + args += ["-f", yaml_node_pool_loc] c = EksctlCommand(args + ["--dry-run"], connection_info) yaml_spec = c.run_and_get_output() @@ -122,59 +128,59 @@ def start(self): yaml_dict = yaml.safe_load(yaml_spec) - if self.config.get('privateCluster', False): + if self.config.get("privateCluster", False): logging.info("Making the cluster fully-private") - - private_cluster = yaml_dict.get('privateCluster', {}) - yaml_dict['privateCluster'] = private_cluster - private_cluster['enabled'] = True - if self.config.get('skipEndpointCreation', False): - private_cluster['skipEndpointCreation'] = True + + private_cluster = yaml_dict.get("privateCluster", {}) + yaml_dict["privateCluster"] = private_cluster + private_cluster["enabled"] = True + if self.config.get("skipEndpointCreation", False): + private_cluster["skipEndpointCreation"] = True else: - private_cluster['skipEndpointCreation'] = False + private_cluster["skipEndpointCreation"] = False if has_autoscaling: - private_cluster["additionalEndpointServices"] = private_cluster.get('additionalEndpointServices', []) - if not 'autoscaling' in private_cluster["additionalEndpointServices"]: - private_cluster["additionalEndpointServices"].append('autoscaling') - - # clear the vpc.clusterEndpoints - yaml_dict['vpc'] = yaml_dict.get('vpc', {}) - yaml_dict['vpc']['clusterEndpoints'] = None - + private_cluster["additionalEndpointServices"] = private_cluster.get("additionalEndpointServices", []) + if "autoscaling" not in private_cluster["additionalEndpointServices"]: + private_cluster["additionalEndpointServices"].append("autoscaling") + + # clear the vpc.clusterEndpoints + yaml_dict["vpc"] = yaml_dict.get("vpc", {}) + yaml_dict["vpc"]["clusterEndpoints"] = None + # make sure we have a security group to use as shared security group # the issue being that eksctl puts this guy on the private VPC endpoints - # and if you don't control it, then the DSS VM will have no access to the + # and if you don't control it, then the DSS VM will have no access to the # endpoints, and eksctl will start failing on calls to EC2 - control_plane_security_group = networking_settings.get('controlPlaneSG', '').strip() - shared_security_group = networking_settings.get('sharedSG', '').strip() + control_plane_security_group = networking_settings.get("controlPlaneSG", "").strip() + shared_security_group = networking_settings.get("sharedSG", "").strip() if len(control_plane_security_group) > 0: - yaml_dict['vpc']['securityGroup'] = control_plane_security_group + yaml_dict["vpc"]["securityGroup"] = control_plane_security_group elif len(shared_security_group) > 0: - yaml_dict['vpc']['sharedNodeSecurityGroup'] = shared_security_group - elif self.config.get('privateCluster', False): - # we'll need to make eksctl able to reach the stuff bearing the + yaml_dict["vpc"]["sharedNodeSecurityGroup"] = shared_security_group + elif self.config.get("privateCluster", False): + # we'll need to make eksctl able to reach the stuff bearing the # SG created by eksctl attach_vm_to_security_groups = True # whatever the setting, make the cluster from the yaml config - yaml_loc = os.path.join(os.getcwd(), self.cluster_id +'_config.yaml') - with open(yaml_loc, 'w') as outfile: + yaml_loc = os.path.join(os.getcwd(), self.cluster_id + "_config.yaml") + with open(yaml_loc, "w") as outfile: yaml.dump(yaml_dict, outfile, default_flow_style=False) logging.info("Final spec\n%s" % yaml.dump(yaml_dict)) - args = ['create', 'cluster'] - args = args + ['-v', '4'] - args = args + ['-f', yaml_loc] - + args = ["create", "cluster"] + args = args + ["-v", "4"] + args = args + ["-f", yaml_loc] + # According to EKSCTL documentation: https://eksctl.io/usage/gpu-support/ # Unless this flag is present, they will automatically install the Nvidia plugin # We add it so that we can control the version of the plugin that is installed. - args += ['--install-nvidia-plugin=false'] + args += ["--install-nvidia-plugin=false"] # we don't add the context to the main config file, to not end up with an oversized config, # and because 2 different clusters could be concurrently editing the config file - kube_config_path = os.path.join(os.getcwd(), 'kube_config') - args = args + ['--kubeconfig', kube_config_path] + kube_config_path = os.path.join(os.getcwd(), "kube_config") + args = args + ["--kubeconfig", kube_config_path] # if a previous kubeconfig exists, it will be merged with the current configuration, possibly keeping unwanted configuration # deleting it ensures a coherent configuration for the cluster @@ -189,17 +195,17 @@ def add_vm_to_sg(): stack_name = None # first pester eksctl until it can give the stack name # (this is normally when the EKS cluster object is ready) - stack_name_args = ['utils', 'describe-stacks'] - stack_name_args = stack_name_args + ['--cluster', self.cluster_id] + stack_name_args = ["utils", "describe-stacks"] + stack_name_args = stack_name_args + ["--cluster", self.cluster_id] stack_name_args = stack_name_args + get_region_arg(connection_info) - stack_name_args = stack_name_args + ['--output', 'json'] + stack_name_args = stack_name_args + ["--output", "json"] while stack_name is None: time.sleep(5) try: stack_name_c = EksctlCommand(stack_name_args, connection_info) stack_spec = stack_name_c.run_and_get_output() stack_name = json.loads(stack_spec)[0]["StackName"] - except: + except Exception: logging.info("Not yet able to get stack name") logging.info("Stack name is %s" % stack_name) # then describe the stack resources to get the shared sg. It should be ready @@ -207,34 +213,34 @@ def add_vm_to_sg(): # done starting, and that's too late for eksctl) sg_ids = [] for resource_id in ["ControlPlaneSecurityGroup", "ClusterSharedNodeSecurityGroup"]: - describe_resource_args = ['cloudformation', 'describe-stack-resource'] + describe_resource_args = ["cloudformation", "describe-stack-resource"] describe_resource_args = describe_resource_args + get_region_arg(connection_info) - describe_resource_args = describe_resource_args + ['--stack-name', stack_name] - describe_resource_args = describe_resource_args + ['--logical-resource-id', resource_id] + describe_resource_args = describe_resource_args + ["--stack-name", stack_name] + describe_resource_args = describe_resource_args + ["--logical-resource-id", resource_id] describe_resource_c = AwsCommand(describe_resource_args, connection_info) try: - describe_resource = json.loads(describe_resource_c.run_and_get_output()).get('StackResourceDetail', {}) + describe_resource = json.loads(describe_resource_c.run_and_get_output()).get("StackResourceDetail", {}) sg_id = describe_resource.get("PhysicalResourceId", None) logging.info("%s SG is %s" % (resource_id, sg_id)) if sg_id is not None and sg_id != injected_security_group: sg_ids.append(sg_id) - except: + except Exception: logging.warn("Not able to get SG id for %s" % resource_id) - + # attach a rule to the shared SG so that the DSS VM can access it (and the VPC endpoints that use it) if len(injected_security_group) > 0: - inbound = ['--source-group', injected_security_group] + inbound = ["--source-group", injected_security_group] else: # if no sg has been given for the VM, use a CIDR with an IP private_ip = get_private_ip_from_metadata() - inbound = ['--cidr', "%s/32" % private_ip] - + inbound = ["--cidr", "%s/32" % private_ip] + logging.info("Add SG=%s to inbound of SG" % injected_security_group) for sg_id in sg_ids: - add_sg_rule_args = ['ec2', 'authorize-security-group-ingress'] + add_sg_rule_args = ["ec2", "authorize-security-group-ingress"] add_sg_rule_args = add_sg_rule_args + get_region_arg(connection_info) - add_sg_rule_args = add_sg_rule_args + ['--group-id', sg_id] - add_sg_rule_args = add_sg_rule_args + ['--protocol', "all"] + add_sg_rule_args = add_sg_rule_args + ["--group-id", sg_id] + add_sg_rule_args = add_sg_rule_args + ["--protocol", "all"] add_sg_rule_args = add_sg_rule_args + inbound add_sg_rule_c = AwsCommand(add_sg_rule_args, connection_info) if add_sg_rule_c.run_and_log() != 0: @@ -243,26 +249,26 @@ def add_vm_to_sg(): t = threading.Thread(target=add_vm_to_sg) t.daemon = True t.start() - + c = EksctlCommand(args, connection_info) if c.run_and_log() != 0: raise Exception("Failed to start cluster") # if you leave eksctl work, you have a public/private EKS endpoint, so we can tighten it even more - if self.config.get('makePrivateOnly', False): - privatize_args = ['utils', 'update-cluster-endpoints'] - privatize_args = privatize_args + ['--name', self.cluster_id] - privatize_args = privatize_args + ['--private-access=true', '--public-access=false'] - privatize_args = privatize_args + ['--approve'] + if self.config.get("makePrivateOnly", False): + privatize_args = ["utils", "update-cluster-endpoints"] + privatize_args = privatize_args + ["--name", self.cluster_id] + privatize_args = privatize_args + ["--private-access=true", "--public-access=false"] + privatize_args = privatize_args + ["--approve"] privatize_args = privatize_args + get_region_arg(connection_info) privatize_c = EksctlCommand(privatize_args, connection_info) if privatize_c.run_and_log() != 0: raise Exception("Failed to make cluster fully private") - args = ['get', 'cluster'] - args = args + ['--name', self.cluster_id] + args = ["get", "cluster"] + args = args + ["--name", self.cluster_id] args = args + get_region_arg(connection_info) - args = args + ['-o', 'json'] + args = args + ["-o", "json"] setup_creds_env(kube_config_path, connection_info, self.config) @@ -271,7 +277,7 @@ def add_vm_to_sg(): gpu_taints = list(gpu_node_pools_taints) if gpu_node_pools_taints else [] add_gpu_driver_if_needed(self.cluster_id, kube_config_path, connection_info, gpu_taints) - if self.config.get('installMetricsServer'): + if self.config.get("installMetricsServer"): install_metrics_server(kube_config_path) c = EksctlCommand(args, connection_info) @@ -287,14 +293,14 @@ def add_vm_to_sg(): # collect and prepare the overrides so that DSS can know where and how to use the cluster overrides = make_overrides(self.config, kube_config, kube_config_path) - return [overrides, {'kube_config_path':kube_config_path, 'cluster':cluster_info}] + return [overrides, {"kube_config_path": kube_config_path, "cluster": cluster_info}] def stop(self, data): connection_info = get_connection_info(self.config) - args = ['delete', 'cluster'] - args = args + ['-v', '4'] - args = args + ['--name', self.cluster_id] + args = ["delete", "cluster"] + args = args + ["-v", "4"] + args = args + ["--name", self.cluster_id] args = args + get_region_arg(connection_info) c = EksctlCommand(args, connection_info) diff --git a/python-lib/dku_aws/aws_command.py b/python-lib/dku_aws/aws_command.py index e609a03..2337e7b 100644 --- a/python-lib/dku_aws/aws_command.py +++ b/python-lib/dku_aws/aws_command.py @@ -1,28 +1,26 @@ -import sys, os, subprocess, logging, json, requests, shutil +import os +import subprocess +import logging from dku_utils.access import _has_not_blank_property, _convert_to_string + class AwsCommand(object): def __init__(self, args, connection_info): self.args = args self.env = os.environ.copy() - if _has_not_blank_property(connection_info, 'accessKey'): - self.env['AWS_ACCESS_KEY_ID'] = connection_info['accessKey'] - if _has_not_blank_property(connection_info, 'secretKey'): - self.env['AWS_SECRET_ACCESS_KEY'] = connection_info['secretKey'] - if _has_not_blank_property(connection_info, 'sessionToken'): - self.env['AWS_SESSION_TOKEN'] = connection_info['sessionToken'] - if _has_not_blank_property(connection_info, 'region'): - self.env['AWS_DEFAULT_REGION'] = connection_info['region'] + if _has_not_blank_property(connection_info, "accessKey"): + self.env["AWS_ACCESS_KEY_ID"] = connection_info["accessKey"] + if _has_not_blank_property(connection_info, "secretKey"): + self.env["AWS_SECRET_ACCESS_KEY"] = connection_info["secretKey"] + if _has_not_blank_property(connection_info, "sessionToken"): + self.env["AWS_SESSION_TOKEN"] = connection_info["sessionToken"] + if _has_not_blank_property(connection_info, "region"): + self.env["AWS_DEFAULT_REGION"] = connection_info["region"] def run(self): cmd = _convert_to_string(["aws"] + self.args) - logging.info('Running %s' % (' '.join(cmd))) - p = subprocess.Popen(cmd, - shell=False, - env=self.env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) + logging.info("Running %s" % (" ".join(cmd))) + p = subprocess.Popen(cmd, shell=False, env=self.env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (o, e) = p.communicate() rv = p.wait() return (cmd, rv, o, e) @@ -32,20 +30,15 @@ def run_and_get_output(self): if result[1] != 0: logging.error(result[3]) cmd = _convert_to_string(["aws"] + self.args) - raise Exception('Failed to execute command: \'%s\'. See log for more details.' % (' '.join(cmd))) + raise Exception("Failed to execute command: '%s'. See log for more details." % (" ".join(cmd))) else: return result[2] def run_and_log(self): cmd = _convert_to_string(["aws"] + self.args) - logging.info('Running %s' % (' '.join(cmd))) - p = subprocess.Popen(cmd, - shell=False, - env=self.env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True) + logging.info("Running %s" % (" ".join(cmd))) + p = subprocess.Popen(cmd, shell=False, env=self.env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) with p.stdout as s: - for line in iter(s.readline, ''): + for line in iter(s.readline, ""): logging.info(line) return p.wait() diff --git a/python-lib/dku_aws/boto3_sts_assumerole.py b/python-lib/dku_aws/boto3_sts_assumerole.py index 3012887..ef623b6 100644 --- a/python-lib/dku_aws/boto3_sts_assumerole.py +++ b/python-lib/dku_aws/boto3_sts_assumerole.py @@ -1,15 +1,13 @@ import boto3 + class Boto3STSService(object): def __init__(self, arn): sess = boto3.Session() - sts_connection = sess.client('sts') - assume_role_object = sts_connection.assume_role( - RoleArn=arn, RoleSessionName="DSS-EKS-Plugin", - DurationSeconds=3600) - creds = assume_role_object['Credentials'] - creds['accessKey'] = creds.pop('AccessKeyId') - creds['secretKey'] = creds.pop('SecretAccessKey') - creds['sessionToken'] = creds.pop('SessionToken') + sts_connection = sess.client("sts") + assume_role_object = sts_connection.assume_role(RoleArn=arn, RoleSessionName="DSS-EKS-Plugin", DurationSeconds=3600) + creds = assume_role_object["Credentials"] + creds["accessKey"] = creds.pop("AccessKeyId") + creds["secretKey"] = creds.pop("SecretAccessKey") + creds["sessionToken"] = creds.pop("SessionToken") self.credentials = creds - \ No newline at end of file diff --git a/python-lib/dku_aws/eksctl_command.py b/python-lib/dku_aws/eksctl_command.py index b57ca6c..83d293c 100644 --- a/python-lib/dku_aws/eksctl_command.py +++ b/python-lib/dku_aws/eksctl_command.py @@ -1,30 +1,28 @@ -import sys, os, subprocess, logging, json, requests, shutil +import os +import subprocess +import logging from .eksctl_loader import get_eksctl_or_fetch from dku_utils.access import _has_not_blank_property, _convert_to_string + class EksctlCommand(object): def __init__(self, args, connection_info): self.args = args self.eksctl_bin = get_eksctl_or_fetch() self.env = os.environ.copy() - if _has_not_blank_property(connection_info, 'accessKey'): - self.env['AWS_ACCESS_KEY_ID'] = connection_info['accessKey'] - if _has_not_blank_property(connection_info, 'secretKey'): - self.env['AWS_SECRET_ACCESS_KEY'] = connection_info['secretKey'] - if _has_not_blank_property(connection_info, 'sessionToken'): - self.env['AWS_SESSION_TOKEN'] = connection_info['sessionToken'] - if _has_not_blank_property(connection_info, 'region'): - self.env['AWS_DEFAULT_REGION'] = connection_info['region'] + if _has_not_blank_property(connection_info, "accessKey"): + self.env["AWS_ACCESS_KEY_ID"] = connection_info["accessKey"] + if _has_not_blank_property(connection_info, "secretKey"): + self.env["AWS_SECRET_ACCESS_KEY"] = connection_info["secretKey"] + if _has_not_blank_property(connection_info, "sessionToken"): + self.env["AWS_SESSION_TOKEN"] = connection_info["sessionToken"] + if _has_not_blank_property(connection_info, "region"): + self.env["AWS_DEFAULT_REGION"] = connection_info["region"] def run(self): cmd = _convert_to_string([self.eksctl_bin] + self.args) - logging.info('Running %s' % (' '.join(cmd))) - p = subprocess.Popen(cmd, - shell=False, - env=self.env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) + logging.info("Running %s" % (" ".join(cmd))) + p = subprocess.Popen(cmd, shell=False, env=self.env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (o, e) = p.communicate() rv = p.wait() return (cmd, rv, o, e) @@ -34,33 +32,23 @@ def run_and_get_output(self): if result[1] != 0: logging.error(result[3]) cmd = _convert_to_string([self.eksctl_bin] + self.args) - raise Exception('Failed to execute command: \'%s\'. See log for more details.' % (' '.join(cmd))) + raise Exception("Failed to execute command: '%s'. See log for more details." % (" ".join(cmd))) else: return result[2] def run_and_log(self): cmd = _convert_to_string([self.eksctl_bin] + self.args) - logging.info('Running %s' % (' '.join(cmd))) - p = subprocess.Popen(cmd, - shell=False, - env=self.env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True) + logging.info("Running %s" % (" ".join(cmd))) + p = subprocess.Popen(cmd, shell=False, env=self.env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) with p.stdout as s: - for line in iter(s.readline, ''): + for line in iter(s.readline, ""): logging.info(line.rstrip()) return p.wait() def run_and_get(self): cmd = _convert_to_string([self.eksctl_bin] + self.args) - logging.info('Running %s' % (' '.join(cmd))) - p = subprocess.Popen(cmd, - shell=False, - env=self.env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) + logging.info("Running %s" % (" ".join(cmd))) + p = subprocess.Popen(cmd, shell=False, env=self.env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out, err = p.communicate() rv = p.wait() return rv, out, err diff --git a/python-lib/dku_aws/eksctl_loader.py b/python-lib/dku_aws/eksctl_loader.py index ad0c78e..d0492b1 100644 --- a/python-lib/dku_aws/eksctl_loader.py +++ b/python-lib/dku_aws/eksctl_loader.py @@ -1,22 +1,27 @@ -import sys, os, subprocess, logging, json, requests, shutil +import os +import subprocess +import logging +import requests +import shutil + def get_eksctl_or_fetch(): try: - machine_eksctl = subprocess.check_output(["which", "eksctl"]).strip().decode('utf8') + machine_eksctl = subprocess.check_output(["which", "eksctl"]).strip().decode("utf8") logging.info("Found eksctl on the machine") return machine_eksctl - except: - local_eksctl_folder = os.path.join(os.environ["DIP_HOME"], 'tmp', 'local_eksctl') + except Exception: + local_eksctl_folder = os.path.join(os.environ["DIP_HOME"], "tmp", "local_eksctl") logging.info("Using eksctl from %s" % local_eksctl_folder) - local_eksctl = os.path.join(local_eksctl_folder, 'eksctl') + local_eksctl = os.path.join(local_eksctl_folder, "eksctl") if not os.path.exists(local_eksctl_folder): os.makedirs(local_eksctl_folder) if not os.path.exists(local_eksctl): - arch = subprocess.check_output(["uname", "-s"]).strip().decode('utf8') + arch = subprocess.check_output(["uname", "-s"]).strip().decode("utf8") logging.info("Downloading eksctl for %s" % arch) r = requests.get("https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_%s_amd64.tar.gz" % arch, stream=True) - local_eksctl_archive = os.path.join(local_eksctl_folder, 'eksctl.tar.gz') - with open(local_eksctl_archive, 'wb') as f: + local_eksctl_archive = os.path.join(local_eksctl_folder, "eksctl.tar.gz") + with open(local_eksctl_archive, "wb") as f: shutil.copyfileobj(r.raw, f) subprocess.check_call(["tar", "-xzf", local_eksctl_archive], cwd=local_eksctl_folder) return local_eksctl diff --git a/python-lib/dku_kube/autoscaler.py b/python-lib/dku_kube/autoscaler.py index bfb56fc..2399ea3 100644 --- a/python-lib/dku_kube/autoscaler.py +++ b/python-lib/dku_kube/autoscaler.py @@ -1,64 +1,72 @@ -import os, json, logging, yaml +import os +import json +import logging +import yaml from .kubectl_command import run_with_timeout from dku_utils.access import _is_none_or_blank from dku_utils.tools_version import strip_kubernetes_version from dku_utils.taints import Toleration +# fmt: off AUTOSCALER_IMAGES = { - '1.24': 'v1.24.3', - '1.25': 'v1.25.3', - '1.26': 'v1.26.4', - '1.27': 'v1.27.3', - '1.28': 'v1.28.0' + "1.24": "v1.24.3", + "1.25": "v1.25.3", + "1.26": "v1.26.4", + "1.27": "v1.27.3", + "1.28": "v1.28.0" } +# fmt: on + def has_autoscaler(kube_config_path): env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path - cmd = ['kubectl', 'get', 'pods', '--namespace', 'kube-system', '-l', 'app=cluster-autoscaler', '--ignore-not-found'] + env["KUBECONFIG"] = kube_config_path + cmd = ["kubectl", "get", "pods", "--namespace", "kube-system", "-l", "app=cluster-autoscaler", "--ignore-not-found"] logging.info("Checking autoscaler presence with : %s" % json.dumps(cmd)) out, err = run_with_timeout(cmd, env=env, timeout=5) return len(out.strip()) > 0 + def add_autoscaler_if_needed(cluster_id, cluster_config, cluster_def, kube_config_path, taints): if not has_autoscaler(kube_config_path): - kubernetes_version = cluster_config.get("k8sVersion", None) - if _is_none_or_blank(kubernetes_version): - kubernetes_version = cluster_def.get("Version") - - kubernetes_version = strip_kubernetes_version(kubernetes_version) - autoscaler_file_path = 'autoscaler.yaml' - - if float(kubernetes_version) < 1.24: - autoscaler_image = AUTOSCALER_IMAGES.get('1.24', 'v1.24.3') - else: - autoscaler_image = AUTOSCALER_IMAGES.get(kubernetes_version, 'v1.28.0') - - autoscaler_full_config = list(yaml.safe_load_all(get_autoscaler_roles())) - autoscaler_config = yaml.safe_load(get_autoscaler_config(cluster_id, autoscaler_image)) - tolerations = set() - - # If there are any taints to patch the autoscaler with in the node group(s) to create, - # we add them to the autoscaler configuration before updating with another `kubectl apply` - tolerations.update(Toleration.from_dict(taints)) - - # Patch the autoscaler with the tolerations derived from node group(s) taints if any - if tolerations: - autoscaler_config['spec']['template']['spec']['tolerations'] = Toleration.to_list(tolerations) - logging.debug('Autoscaler deployment config: %s' % yaml.safe_dump(autoscaler_config, default_flow_style=False)) - - autoscaler_full_config.append(autoscaler_config) - logging.debug('Autoscaler complete config: %s' % yaml.safe_dump_all(autoscaler_full_config, default_flow_style=False)) - - with open(autoscaler_file_path, "w") as f: - yaml.safe_dump_all(autoscaler_full_config, f, explicit_start=True) - - env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path - cmd = ['kubectl', 'create', '-f', os.path.abspath(autoscaler_file_path)] - logging.info("Create autoscaler with : %s" % json.dumps(cmd)) - run_with_timeout(cmd, env=env, timeout=5) - + kubernetes_version = cluster_config.get("k8sVersion", None) + if _is_none_or_blank(kubernetes_version): + kubernetes_version = cluster_def.get("Version") + + kubernetes_version = strip_kubernetes_version(kubernetes_version) + autoscaler_file_path = "autoscaler.yaml" + + if float(kubernetes_version) < 1.24: + autoscaler_image = AUTOSCALER_IMAGES.get("1.24", "v1.24.3") + else: + autoscaler_image = AUTOSCALER_IMAGES.get(kubernetes_version, "v1.28.0") + + autoscaler_full_config = list(yaml.safe_load_all(get_autoscaler_roles())) + autoscaler_config = yaml.safe_load(get_autoscaler_config(cluster_id, autoscaler_image)) + tolerations = set() + + # If there are any taints to patch the autoscaler with in the node group(s) to create, + # we add them to the autoscaler configuration before updating with another `kubectl apply` + tolerations.update(Toleration.from_dict(taints)) + + # Patch the autoscaler with the tolerations derived from node group(s) taints if any + if tolerations: + autoscaler_config["spec"]["template"]["spec"]["tolerations"] = Toleration.to_list(tolerations) + logging.debug("Autoscaler deployment config: %s" % yaml.safe_dump(autoscaler_config, default_flow_style=False)) + + autoscaler_full_config.append(autoscaler_config) + logging.debug("Autoscaler complete config: %s" % yaml.safe_dump_all(autoscaler_full_config, default_flow_style=False)) + + with open(autoscaler_file_path, "w") as f: + yaml.safe_dump_all(autoscaler_full_config, f, explicit_start=True) + + env = os.environ.copy() + env["KUBECONFIG"] = kube_config_path + cmd = ["kubectl", "create", "-f", os.path.abspath(autoscaler_file_path)] + logging.info("Create autoscaler with : %s" % json.dumps(cmd)) + run_with_timeout(cmd, env=env, timeout=5) + + def get_autoscaler_roles(): # the auto-discovery version from https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler/cloudprovider/aws # all the necessary roles and tags are handled by eksctl with the --asg-access flag @@ -182,6 +190,7 @@ def get_autoscaler_roles(): namespace: kube-system """ + def get_autoscaler_config(cluster_id, autoscaler_image_version): return """apiVersion: apps/v1 kind: Deployment @@ -228,4 +237,4 @@ def get_autoscaler_config(cluster_id, autoscaler_image_version): - name: ssl-certs hostPath: path: "/etc/ssl/certs/ca-bundle.crt" -""" % {'autoscalerimageversion': autoscaler_image_version, 'clusterid': cluster_id} \ No newline at end of file +""" % {"autoscalerimageversion": autoscaler_image_version, "clusterid": cluster_id} diff --git a/python-lib/dku_kube/busybox_pod.py b/python-lib/dku_kube/busybox_pod.py index 689f7c9..3eeb9c6 100644 --- a/python-lib/dku_kube/busybox_pod.py +++ b/python-lib/dku_kube/busybox_pod.py @@ -1,15 +1,22 @@ -import os, sys, json, yaml, logging, random, time +import os +import json +import yaml +import logging +import random +import time from .kubectl_command import run_with_timeout + class BusyboxPod(object): def __init__(self, kube_config_path): self.env = os.environ.copy() - self.env['KUBECONFIG'] = kube_config_path - uid = ''.join([random.choice('abcdefghijklmnopqrstuvwxyz0123456789') for i in range(0,8)]) + self.env["KUBECONFIG"] = kube_config_path + uid = "".join([random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for i in range(0, 8)]) self.pod_name = "busybox-" + uid - + def __enter__(self): # create pod + # fmt: off pod_yaml = { "apiVersion": "v1", "kind": "Pod", @@ -29,47 +36,47 @@ def __enter__(self): "restartPolicy": "Always" } } - pod_file_path = 'busybox_pod.yaml' + # fmt: on + pod_file_path = "busybox_pod.yaml" with open(pod_file_path, "w") as f: yaml.safe_dump(pod_yaml, f) - cmd = ['kubectl', 'create', '-f', os.path.abspath(pod_file_path)] + cmd = ["kubectl", "create", "-f", os.path.abspath(pod_file_path)] logging.info("Create pod with : %s" % json.dumps(cmd)) run_with_timeout(cmd, env=self.env, timeout=5) - + # wait for it to actually run (could be stuck in pending if no resource available) waited = 0 pod_state = self.get_pod_state() - while pod_state != 'running' and waited < 10: + while pod_state != "running" and waited < 10: time.sleep(1) waited += 1 pod_state = self.get_pod_state() - - if pod_state != 'running': + + if pod_state != "running": self.delete_pod() - raise Exception('Busybox did not start in 10s') - + raise Exception("Busybox did not start in 10s") + return self - + def get_pod_state(self): - cmd = ['kubectl', 'get', 'pod', self.pod_name, '-o', 'json'] + cmd = ["kubectl", "get", "pod", self.pod_name, "-o", "json"] logging.info("Poll pod state with : %s" % json.dumps(cmd)) out, err = run_with_timeout(cmd, env=self.env, timeout=5) - return json.loads(out)['status']['phase'].lower() + return json.loads(out)["status"]["phase"].lower() def delete_pod(self): - cmd = ['kubectl', 'delete', 'pods', self.pod_name] + cmd = ["kubectl", "delete", "pods", self.pod_name] logging.info("Delete pod with : %s" % json.dumps(cmd)) - run_with_timeout(cmd, env=self.env, timeout=3, nokill=True) # fire and forget - + run_with_timeout(cmd, env=self.env, timeout=3, nokill=True) # fire and forget + def __exit__(self, exc_type, exc_val, exc_tb): self.delete_pod() logging.info("Exited busybox") return False - + def exec_cmd(self, cmd, timeout=5): - kcmd = ['kubectl', 'exec', self.pod_name, '--'] + cmd + kcmd = ["kubectl", "exec", self.pod_name, "--"] + cmd logging.info("Execute in pod with : %s" % json.dumps(kcmd)) out, err = run_with_timeout(kcmd, env=self.env, timeout=timeout) return out, err - \ No newline at end of file diff --git a/python-lib/dku_kube/gpu_driver.py b/python-lib/dku_kube/gpu_driver.py index 1c4b37d..f17b30e 100644 --- a/python-lib/dku_kube/gpu_driver.py +++ b/python-lib/dku_kube/gpu_driver.py @@ -1,34 +1,50 @@ -import os, json, logging, requests, yaml +import os +import json +import logging +import requests +import yaml -from dku_aws.eksctl_command import EksctlCommand from dku_utils.access import _is_none_or_blank from .kubectl_command import run_with_timeout from dku_utils.taints import Toleration + def has_gpu_driver(kube_config_path): env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path - cmd = ['kubectl', 'get', 'pods', '--namespace', 'kube-system', '-l', 'name=nvidia-device-plugin-ds', '--ignore-not-found'] - logging.info('Checking if NVIDIA GPU drivers are installed with : %s' % json.dumps(cmd)) + env["KUBECONFIG"] = kube_config_path + cmd = ["kubectl", "get", "pods", "--namespace", "kube-system", "-l", "name=nvidia-device-plugin-ds", "--ignore-not-found"] + logging.info("Checking if NVIDIA GPU drivers are installed with : %s" % json.dumps(cmd)) out, err = run_with_timeout(cmd, env=env, timeout=5) return len(out.strip()) > 0 + def add_gpu_driver_if_needed(cluster_id, kube_config_path, connection_info, taints): env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path + env["KUBECONFIG"] = kube_config_path # Get the Nvidia driver plugin configuration from the repository - nvidia_config_raw = requests.get('https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/main/deployments/static/nvidia-device-plugin.yml').text + nvidia_config_raw = requests.get( + "https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/main/deployments/static/nvidia-device-plugin.yml" + ).text nvidia_config = yaml.safe_load(nvidia_config_raw) tolerations = set() # Get any tolerations from the plugin configuration - if nvidia_config.get('spec', {}) and nvidia_config['spec'].get('template', {}) and nvidia_config['spec']['template'].get('spec', {}): - tolerations.update(Toleration.from_dict(nvidia_config['spec']['template']['spec']['tolerations'])) + if nvidia_config.get("spec", {}) and nvidia_config["spec"].get("template", {}) and nvidia_config["spec"]["template"].get("spec", {}): + tolerations.update(Toleration.from_dict(nvidia_config["spec"]["template"]["spec"]["tolerations"])) # Retrieve the tolerations on the daemonset currently deployed to the cluster. if has_gpu_driver(kube_config_path): - cmd = ['kubectl', 'get', 'daemonset', 'nvidia-device-plugin-daemonset', '-n', 'kube-system', '-o', 'jsonpath="{.spec.template.spec.tolerations}"'] + cmd = [ + "kubectl", + "get", + "daemonset", + "nvidia-device-plugin-daemonset", + "-n", + "kube-system", + "-o", + 'jsonpath="{.spec.template.spec.tolerations}"', + ] cmd_result, err = run_with_timeout(cmd, env=env, timeout=5) tolerations_json = cmd_result[1:-1] if not _is_none_or_blank(tolerations_json): @@ -37,19 +53,19 @@ def add_gpu_driver_if_needed(cluster_id, kube_config_path, connection_info, tain # If there are any taints to patch the daemonset with in the node group(s) to create, # we add them to the GPU plugin configuration before updating with another `kubectl apply` tolerations.update(Toleration.from_dict(taints)) - + # Patch the Nvidia driver configuration with the tolerations derived from node group(s) taints, # initial Nvidia driver configuration tolerations and Nvidia daemonset tolerations (when applicable) - nvidia_config['spec']['template']['spec']['tolerations'] = Toleration.to_list(tolerations) + nvidia_config["spec"]["template"]["spec"]["tolerations"] = Toleration.to_list(tolerations) # Write the configuration locally - local_nvidia_plugin_config = os.path.join(os.environ["DIP_HOME"], 'clusters', cluster_id, 'nvidia-device-plugin.yml') + local_nvidia_plugin_config = os.path.join(os.environ["DIP_HOME"], "clusters", cluster_id, "nvidia-device-plugin.yml") with open(local_nvidia_plugin_config, "w") as f: yaml.safe_dump(nvidia_config, f) # Apply the patched Nvidia driver configuration to the cluster - cmd = ['kubectl', 'apply', '-f', local_nvidia_plugin_config] - logging.info('Running command to install Nvidia drivers: %s', ' '.join(cmd)) - logging.info('NVIDIA GPU driver config: %s' % yaml.safe_dump(nvidia_config, default_flow_style=False)) + cmd = ["kubectl", "apply", "-f", local_nvidia_plugin_config] + logging.info("Running command to install Nvidia drivers: %s", " ".join(cmd)) + logging.info("NVIDIA GPU driver config: %s" % yaml.safe_dump(nvidia_config, default_flow_style=False)) - run_with_timeout(cmd, env=env, timeout=5) \ No newline at end of file + run_with_timeout(cmd, env=env, timeout=5) diff --git a/python-lib/dku_kube/kubeconfig.py b/python-lib/dku_kube/kubeconfig.py index 44e9c77..3c02326 100644 --- a/python-lib/dku_kube/kubeconfig.py +++ b/python-lib/dku_kube/kubeconfig.py @@ -1,14 +1,19 @@ -import os, sys, json, yaml, logging -from dku_utils.access import _has_not_blank_property, _is_none_or_blank +import os +import json +import yaml +import logging +from dku_utils.access import _has_not_blank_property + def get_first_kube_config(kube_config_path=None): if kube_config_path is None: - if _has_not_blank_property(os.environ, 'KUBECONFIG'): - kube_config_path = os.environ['KUBECONFIG'].split(':')[0] + if _has_not_blank_property(os.environ, "KUBECONFIG"): + kube_config_path = os.environ["KUBECONFIG"].split(":")[0] else: - kube_config_path = os.path.join(os.environ['HOME'], '.kube', 'config') + kube_config_path = os.path.join(os.environ["HOME"], ".kube", "config") return kube_config_path + def merge_or_write_config(config, kube_config_path=None): kube_config_path = get_first_kube_config(kube_config_path) @@ -16,7 +21,7 @@ def merge_or_write_config(config, kube_config_path=None): logging.info("A kube config exists at %s => merging" % kube_config_path) with open(kube_config_path, "r") as f: existing = yaml.safe_load(f) - for k in ['users', 'clusters', 'contexts']: + for k in ["users", "clusters", "contexts"]: elements = existing.get(k, []) existing[k] = elements new_elements = config.get(k, []) @@ -26,7 +31,7 @@ def merge_or_write_config(config, kube_config_path=None): for i in range(0, len(elements)): if elements[i].get("name", None) == name: element_idx = i - logging.info(" %s > %s : %s" % (k, name, 'replace' if element_idx is not None else 'append')) + logging.info(" %s > %s : %s" % (k, name, "replace" if element_idx is not None else "append")) if element_idx is not None: elements[element_idx] = new_element else: @@ -48,34 +53,37 @@ def merge_or_write_config(config, kube_config_path=None): with open(kube_config_path, "w") as f: yaml.safe_dump(config, f) + def add_authenticator_env(kube_config_path, env): with open(kube_config_path, "r") as f: existing = yaml.safe_load(f) - if 'exec' in existing['users'][0]['user']: - authenticator = existing['users'][0]['user']['exec'] - authenticator_env = authenticator.get('env', []) + if "exec" in existing["users"][0]["user"]: + authenticator = existing["users"][0]["user"]["exec"] + authenticator_env = authenticator.get("env", []) if authenticator_env is None: authenticator_env = [] for k in env: - authenticator_env.append({'name':k, 'value':env[k]}) - authenticator['env'] = authenticator_env + authenticator_env.append({"name": k, "value": env[k]}) + authenticator["env"] = authenticator_env with open(kube_config_path, "w") as f: yaml.safe_dump(existing, f) + def add_assumed_arn(kube_config_path, arn): with open(kube_config_path, "r") as f: existing = yaml.safe_load(f) - if 'exec' in existing['users'][0]['user']: - existing['users'][0]['user']['exec']['args'].extend(['-r',arn]) + if "exec" in existing["users"][0]["user"]: + existing["users"][0]["user"]["exec"]["args"].extend(["-r", arn]) with open(kube_config_path, "w") as f: yaml.safe_dump(existing, f) + def setup_creds_env(kube_config_path, connection_info, config): # If the arn exists, then add it to the kubeconfig so it is the assumed role for future use - arn = config.get('assumeRoleARN', '') + arn = config.get("assumeRoleARN", "") if arn: logging.info("Assuming role %s" % arn) add_assumed_arn(kube_config_path, arn) - elif _has_not_blank_property(connection_info, 'accessKey') and _has_not_blank_property(connection_info, 'secretKey'): - creds_in_env = {'AWS_ACCESS_KEY_ID':connection_info['accessKey'], 'AWS_SECRET_ACCESS_KEY':connection_info['secretKey']} + elif _has_not_blank_property(connection_info, "accessKey") and _has_not_blank_property(connection_info, "secretKey"): + creds_in_env = {"AWS_ACCESS_KEY_ID": connection_info["accessKey"], "AWS_SECRET_ACCESS_KEY": connection_info["secretKey"]} add_authenticator_env(kube_config_path, creds_in_env) diff --git a/python-lib/dku_kube/kubectl_command.py b/python-lib/dku_kube/kubectl_command.py index 34bf583..a21b804 100644 --- a/python-lib/dku_kube/kubectl_command.py +++ b/python-lib/dku_kube/kubectl_command.py @@ -1,4 +1,6 @@ -import os, sys, json, yaml, logging, subprocess, time +import subprocess +import time + class KubeCommandException(Exception): def __init__(self, message, out, err, rv): @@ -6,13 +8,10 @@ def __init__(self, message, out, err, rv): self.out = out self.err = err self.rv = rv - + + def run_with_timeout(cmd, env=None, timeout=3, nokill=False): - p = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env, - universal_newlines=True) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, universal_newlines=True) waited = 0 while waited < timeout and p.poll() is None: time.sleep(1) @@ -28,5 +27,3 @@ def run_with_timeout(cmd, env=None, timeout=3, nokill=False): if rv != 0: raise KubeCommandException("Command failed with %s" % rv, out, err, rv) return out, err - - diff --git a/python-lib/dku_kube/metrics_server.py b/python-lib/dku_kube/metrics_server.py index 60eb866..4fee737 100644 --- a/python-lib/dku_kube/metrics_server.py +++ b/python-lib/dku_kube/metrics_server.py @@ -1,16 +1,20 @@ -import os, json, logging, traceback +import os +import json +import logging +import traceback from .kubectl_command import run_with_timeout, KubeCommandException + def install_metrics_server(kube_config_path): try: env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path - cmd = ['kubectl', 'apply', '-f', 'https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml'] + env["KUBECONFIG"] = kube_config_path + cmd = ["kubectl", "apply", "-f", "https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml"] logging.info("Installing Metrics Server with : %s" % json.dumps(cmd)) out, err = run_with_timeout(cmd, env=env, timeout=30) except KubeCommandException as e: - logging.warning('Failed to install metrics server: %s' % json.dumps([cmd, e.rv, e.out, e.err])) + logging.warning("Failed to install metrics server: %s" % json.dumps([cmd, e.rv, e.out, e.err])) traceback.print_exc() except Exception: - logging.warning('Failed to install metrics server') + logging.warning("Failed to install metrics server") traceback.print_exc() diff --git a/python-lib/dku_utils/access.py b/python-lib/dku_utils/access.py index 19e6261..393a2d2 100644 --- a/python-lib/dku_utils/access.py +++ b/python-lib/dku_utils/access.py @@ -1,24 +1,26 @@ from six import text_type + try: from collections.abc import Mapping, Iterable except ImportError: from collections import Mapping, Iterable -from io import StringIO, BytesIO import sys if sys.version_info > (3,): dku_basestring_type = str else: - dku_basestring_type = basestring + dku_basestring_type = basestring # noqa: F821 + def _convert_to_string(data): - for i in range(0,len(data)): + for i in range(0, len(data)): try: data[i] = data[i].decode() except (UnicodeDecodeError, AttributeError): pass return data + def _get_in_object_or_array(o, chunk, d): if isinstance(chunk, int): if chunk >= 0 and chunk < len(o): @@ -28,30 +30,36 @@ def _get_in_object_or_array(o, chunk, d): else: return o.get(chunk, d) + def _safe_get_value(o, chunks, default_value=None): if len(chunks) == 1: return _get_in_object_or_array(o, chunks[0], default_value) else: return _safe_get_value(_get_in_object_or_array(o, chunks[0], {}), chunks[1:], default_value) + def _is_none_or_blank(x): return x is None or (isinstance(x, text_type) and len(x.strip()) == 0) + def _has_not_blank_property(d, k): return k in d and not _is_none_or_blank(d[k]) + def _default_if_blank(x, d): if _is_none_or_blank(x): return d else: return x + def _default_if_property_blank(d, k, v): - if not k in d: + if k not in d: return v x = d[k] return _default_if_blank(x, v) + def _merge_objects(a, b): if isinstance(a, Mapping) and isinstance(b, Mapping): r = {} diff --git a/python-lib/dku_utils/cluster.py b/python-lib/dku_utils/cluster.py index e7e8737..5757835 100644 --- a/python-lib/dku_utils/cluster.py +++ b/python-lib/dku_utils/cluster.py @@ -1,21 +1,24 @@ -from dku_utils.access import _default_if_blank, _default_if_property_blank +from dku_utils.access import _default_if_property_blank import dataiku from dataiku.core.intercom import backend_json_call from dku_utils.access import _has_not_blank_property from dku_aws.boto3_sts_assumerole import Boto3STSService -import json, logging +import json +import logging + def make_overrides(config, kube_config, kube_config_path): # alter the spark configurations to put the cluster master and image repo in the properties container_settings = { - 'executionConfigsGenericOverrides': { - 'kubeCtlContext': kube_config["current-context"], # has to exist, it's a config file we just built - 'kubeConfigPath': kube_config_path, # the config is not merged into the main config file, so we need to pass the config file pth - 'baseImage': _default_if_property_blank(config, "baseImage", None), - 'repositoryURL': _default_if_property_blank(config, "repositoryURL", None) - } - } - return {'container':container_settings} + "executionConfigsGenericOverrides": { + "kubeCtlContext": kube_config["current-context"], # has to exist, it's a config file we just built + "kubeConfigPath": kube_config_path, # the config is not merged into the main config file, so we need to pass the config file pth + "baseImage": _default_if_property_blank(config, "baseImage", None), + "repositoryURL": _default_if_property_blank(config, "repositoryURL", None), + } + } + return {"container": container_settings} + def get_cluster_from_dss_cluster(dss_cluster_id): # get the public API client @@ -24,7 +27,7 @@ def get_cluster_from_dss_cluster(dss_cluster_id): # get the cluster object in DSS found = False for c in client.list_clusters(): - if c['name'] == dss_cluster_id: + if c["name"] == dss_cluster_id: found = True if not found: raise Exception("DSS cluster %s doesn't exist" % dss_cluster_id) @@ -32,45 +35,50 @@ def get_cluster_from_dss_cluster(dss_cluster_id): # get the settings in it dss_cluster_settings = dss_cluster.get_settings() - dss_cluster_config = dss_cluster_settings.get_raw()['params']['config'] + dss_cluster_config = dss_cluster_settings.get_raw()["params"]["config"] # resolve since we get the config with the raw preset setup - dss_cluster_config = backend_json_call('plugins/get-resolved-settings', data={'elementConfig':json.dumps(dss_cluster_config), 'elementType':dss_cluster_settings.get_raw()['type']}) + dss_cluster_config = backend_json_call( + "plugins/get-resolved-settings", data={"elementConfig": json.dumps(dss_cluster_config), "elementType": dss_cluster_settings.get_raw()["type"]} + ) logging.info("Resolved cluster config : %s" % json.dumps(dss_cluster_config)) cluster_data = dss_cluster_settings.get_plugin_data() return cluster_data, dss_cluster_settings, dss_cluster_config - + + def get_cluster_generic_property(dss_cluster_settings, key, default_value=None): - props = dss_cluster_settings.settings['containerSettings']['executionConfigsGenericOverrides']['properties'] + props = dss_cluster_settings.settings["containerSettings"]["executionConfigsGenericOverrides"]["properties"] found_value = default_value for prop in props: - if prop['key'] == key: - found_value = prop['value'] + if prop["key"] == key: + found_value = prop["value"] return found_value + def set_cluster_generic_property(dss_cluster_settings, key, value, replace_if_exists=False): - props = dss_cluster_settings.settings['containerSettings']['executionConfigsGenericOverrides']['properties'] + props = dss_cluster_settings.settings["containerSettings"]["executionConfigsGenericOverrides"]["properties"] found_prop = None for prop in props: - if prop['key'] == key: + if prop["key"] == key: found_prop = prop if found_prop is None: - props.append({'key':key, 'value':value}) + props.append({"key": key, "value": value}) dss_cluster_settings.save() elif replace_if_exists: - found_prop['value'] = value + found_prop["value"] = value dss_cluster_settings.save() + def get_connection_info(config): # grab the ARN if it exists - arn = config.get('assumeRoleARN', '') - info = config.get('connectionInfo', {}) + arn = config.get("assumeRoleARN", "") + info = config.get("connectionInfo", {}) # If the arn exists use boto3 to assumeRole to it, otherwise use the regular connection info if arn: connection_info = Boto3STSService(arn).credentials - if _has_not_blank_property(info, 'region'): - connection_info['region'] = info['region'] + if _has_not_blank_property(info, "region"): + connection_info["region"] = info["region"] else: connection_info = info - return connection_info \ No newline at end of file + return connection_info diff --git a/python-lib/dku_utils/config_parser.py b/python-lib/dku_utils/config_parser.py index f9feb32..7a66c01 100644 --- a/python-lib/dku_utils/config_parser.py +++ b/python-lib/dku_utils/config_parser.py @@ -1,11 +1,15 @@ # Provide some utility methods to parse the saved configuration, clean it, # normalize it and return in a predefined format (ex: command line args) -import os, logging, requests, json +import os +import logging +import requests +import json from dku_utils.access import _has_not_blank_property -SECURITY_GROUPS = 'securityGroups' -SECURITY_GROUPS_ARG = '--node-security-groups' +SECURITY_GROUPS = "securityGroups" +SECURITY_GROUPS_ARG = "--node-security-groups" + def get_security_groups_arg(config): """ @@ -23,38 +27,44 @@ def get_security_groups_arg(config): params = list(map(lambda param: param.strip(), params)) params = list(filter(None, params)) - return [SECURITY_GROUPS_ARG, ','.join(params)] + return [SECURITY_GROUPS_ARG, ",".join(params)] REGION_ARG = "--region" + def get_region_fallback_to_metadata(connection_info): - if _has_not_blank_property(connection_info, 'region'): - logging.info("Using region %s" % connection_info['region']) - return connection_info['region'] - if 'AWS_DEFAULT_REGION' in os.environ: - logging.info("Using AWS_DEFAULT_REGION %s" % os.environ['AWS_DEFAULT_REGION']) - return os.environ['AWS_DEFAULT_REGION'] + if _has_not_blank_property(connection_info, "region"): + logging.info("Using region %s" % connection_info["region"]) + return connection_info["region"] + if "AWS_DEFAULT_REGION" in os.environ: + logging.info("Using AWS_DEFAULT_REGION %s" % os.environ["AWS_DEFAULT_REGION"]) + return os.environ["AWS_DEFAULT_REGION"] try: - imds_token = requests.put("http://169.254.169.254/latest/api/token", headers = {'X-aws-ec2-metadata-token-ttl-seconds': '2160'}).text - document = requests.get("http://169.254.169.254/latest/dynamic/instance-identity/document", headers={ 'X-aws-ec2-metadata-token': imds_token}).text - return json.loads(document).get('region') + imds_token = requests.put("http://169.254.169.254/latest/api/token", headers={"X-aws-ec2-metadata-token-ttl-seconds": "2160"}).text + document = requests.get( + "http://169.254.169.254/latest/dynamic/instance-identity/document", headers={"X-aws-ec2-metadata-token": imds_token} + ).text + return json.loads(document).get("region") except Exception as e: logging.error("Failed to get region from metadata: %s" % str(e)) return None + def get_region_arg(connection_info): region = get_region_fallback_to_metadata(connection_info) if region is not None: return [REGION_ARG, region] return [] + def get_private_ip_from_metadata(): try: - imds_token = requests.put("http://169.254.169.254/latest/api/token", headers = {'X-aws-ec2-metadata-token-ttl-seconds': '2160'}).text - document = requests.get("http://169.254.169.254/latest/dynamic/instance-identity/document", headers={ 'X-aws-ec2-metadata-token': imds_token}).text - return json.loads(document).get('privateIp') + imds_token = requests.put("http://169.254.169.254/latest/api/token", headers={"X-aws-ec2-metadata-token-ttl-seconds": "2160"}).text + document = requests.get( + "http://169.254.169.254/latest/dynamic/instance-identity/document", headers={"X-aws-ec2-metadata-token": imds_token} + ).text + return json.loads(document).get("privateIp") except Exception as e: logging.error("Failed to get region from metadata: %s" % str(e)) return None - diff --git a/python-lib/dku_utils/node_pool.py b/python-lib/dku_utils/node_pool.py index 339df27..783859c 100644 --- a/python-lib/dku_utils/node_pool.py +++ b/python-lib/dku_utils/node_pool.py @@ -1,118 +1,128 @@ import logging from dku_utils.access import _is_none_or_blank + def get_node_pool_args(node_pool): args = [] - if 'machineType' in node_pool: - args = args + ['--node-type', node_pool['machineType']] - if 'diskType' in node_pool: - args = args + ['--node-volume-type', node_pool['diskType']] - if 'diskSizeGb' in node_pool and node_pool['diskSizeGb'] > 0: - disk_size_gb = node_pool['diskSizeGb'] + if "machineType" in node_pool: + args = args + ["--node-type", node_pool["machineType"]] + if "diskType" in node_pool: + args = args + ["--node-volume-type", node_pool["diskType"]] + if "diskSizeGb" in node_pool and node_pool["diskSizeGb"] > 0: + disk_size_gb = node_pool["diskSizeGb"] else: - disk_size_gb = 200 # also defined as default value in parameter-sets/node-pool-request/parameter-set.json - args = args + ['--node-volume-size', str(disk_size_gb)] + disk_size_gb = 200 # also defined as default value in parameter-sets/node-pool-request/parameter-set.json + args = args + ["--node-volume-size", str(disk_size_gb)] - args = args + ['--nodes', str(node_pool.get('numNodes', 3))] - if node_pool.get('numNodesAutoscaling', False): - args = args + ['--asg-access'] - args = args + ['--nodes-min', str(node_pool.get('minNumNodes', 2))] - args = args + ['--nodes-max', str(node_pool.get('maxNumNodes', 5))] + args = args + ["--nodes", str(node_pool.get("numNodes", 3))] + if node_pool.get("numNodesAutoscaling", False): + args = args + ["--asg-access"] + args = args + ["--nodes-min", str(node_pool.get("minNumNodes", 2))] + args = args + ["--nodes-max", str(node_pool.get("maxNumNodes", 5))] - tags = node_pool.get('tags', {}) + tags = node_pool.get("tags", {}) if tags: - tag_list = [key + '=' + value for key, value in tags.items()] - args = args + ['--tags', ','.join(tag_list)] + tag_list = [key + "=" + value for key, value in tags.items()] + args = args + ["--tags", ",".join(tag_list)] - if node_pool.get('useSpotInstances', False): - args = args + ['--managed', '--spot'] + if node_pool.get("useSpotInstances", False): + args = args + ["--managed", "--spot"] - if node_pool.get('publicKeyName', ''): - args = args + ['--ssh-access'] - args = args + ['--ssh-public-key', node_pool.get('publicKeyName', '')] + if node_pool.get("publicKeyName", ""): + args = args + ["--ssh-access"] + args = args + ["--ssh-public-key", node_pool.get("publicKeyName", "")] - node_pool['labels'] = node_pool.get('labels', {}) - if node_pool['labels']: + node_pool["labels"] = node_pool.get("labels", {}) + if node_pool["labels"]: labels = [] - for label_key, label_value in node_pool['labels'].items(): + for label_key, label_value in node_pool["labels"].items(): if not label_key: - logging.error('At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: %s' % node_pool['labels']) - raise Exception('At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: %s' % node_pool['labels']) + logging.error( + "At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: %s" % node_pool["labels"] + ) + raise Exception( + "At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: %s" % node_pool["labels"] + ) if not label_value: - label_value = '' - labels.append('%s=%s' % (label_key, label_value)) - args += ['--node-labels', ','.join(labels)] + label_value = "" + labels.append("%s=%s" % (label_key, label_value)) + args += ["--node-labels", ",".join(labels)] return args + def get_node_pool_yaml(node_pool, networking_settings): yaml = { - 'iam': { - 'withAddonPolicies': { - 'imageBuilder': True # Adding full ECR access to the node group + "iam": { + "withAddonPolicies": { + "imageBuilder": True # Adding full ECR access to the node group } } } - if 'machineType' in node_pool: - yaml['instanceType'] = node_pool['machineType'] + if "machineType" in node_pool: + yaml["instanceType"] = node_pool["machineType"] - if 'diskType' in node_pool: - yaml['volumeType'] = node_pool['diskType'] + if "diskType" in node_pool: + yaml["volumeType"] = node_pool["diskType"] - if 'diskSizeGb' in node_pool and node_pool['diskSizeGb'] > 0: - yaml['volumeSize'] = node_pool['diskSizeGb'] + if "diskSizeGb" in node_pool and node_pool["diskSizeGb"] > 0: + yaml["volumeSize"] = node_pool["diskSizeGb"] else: - yaml['volumeSize'] = 200 # also defined as default value in parameter-sets/node-pool-request/parameter-set.json - - yaml['desiredCapacity'] = node_pool.get('numNodes', 3) - if node_pool.get('numNodesAutoscaling', False): - yaml['iam']['withAddonPolicies']['autoScaler'] = True - yaml['minSize'] = node_pool.get('minNumNodes', 2) - yaml['maxSize'] = node_pool.get('maxNumNodes', 5) - yaml['propagateASGTags'] = True - - yaml['tags'] = node_pool.get('tags', {}) - yaml['taints'] = build_node_pool_taints_yaml(node_pool) - node_pool['labels'] = node_pool.get('labels', {}) - if any(_is_none_or_blank(label_key) for label_key in node_pool['labels'].keys()): - logging.error('At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: [%s]' % ';'.join(node_pool['labels'])) - raise Exception('At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: [%s]' % ';'.join(node_pool['labels'])) - yaml['labels'] = node_pool['labels'] - yaml['spot'] = node_pool.get('useSpotInstances', False) - - sshPublicKeyName = node_pool.get('publicKeyName', None) + yaml["volumeSize"] = 200 # also defined as default value in parameter-sets/node-pool-request/parameter-set.json + + yaml["desiredCapacity"] = node_pool.get("numNodes", 3) + if node_pool.get("numNodesAutoscaling", False): + yaml["iam"]["withAddonPolicies"]["autoScaler"] = True + yaml["minSize"] = node_pool.get("minNumNodes", 2) + yaml["maxSize"] = node_pool.get("maxNumNodes", 5) + yaml["propagateASGTags"] = True + + yaml["tags"] = node_pool.get("tags", {}) + yaml["taints"] = build_node_pool_taints_yaml(node_pool) + node_pool["labels"] = node_pool.get("labels", {}) + if any(_is_none_or_blank(label_key) for label_key in node_pool["labels"].keys()): + logging.error( + "At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: [%s]" + % ";".join(node_pool["labels"]) + ) + raise Exception( + "At least one node pool label key is not valid, please ensure label keys are not empty. Observed labels: [%s]" + % ";".join(node_pool["labels"]) + ) + yaml["labels"] = node_pool["labels"] + yaml["spot"] = node_pool.get("useSpotInstances", False) + + sshPublicKeyName = node_pool.get("publicKeyName", None) if not _is_none_or_blank(sshPublicKeyName): - yaml['ssh'] = { - 'allow': True, - 'publicKeyName': sshPublicKeyName - } + yaml["ssh"] = {"allow": True, "publicKeyName": sshPublicKeyName} - if networking_settings.get('securityGroups', []): - yaml['securityGroups'] = { - 'attachIDs': list(map(lambda security_group: security_group.strip(), networking_settings['securityGroups'])) - } - yaml['privateNetworking'] = networking_settings.get('privateNetworking', False) + if networking_settings.get("securityGroups", []): + yaml["securityGroups"] = {"attachIDs": list(map(lambda security_group: security_group.strip(), networking_settings["securityGroups"]))} + yaml["privateNetworking"] = networking_settings.get("privateNetworking", False) - if node_pool.get('addPreBootstrapCommands', False) and not _is_none_or_blank(node_pool.get('preBootstrapCommands', None)): - yaml['preBootstrapCommands'] = yaml.get('preBootstrapCommands', []) - yaml['preBootstrapCommands'] += [command.strip()\ - for command in node_pool['preBootstrapCommands'].split('\n')\ - if not _is_none_or_blank(command)] + if node_pool.get("addPreBootstrapCommands", False) and not _is_none_or_blank(node_pool.get("preBootstrapCommands", None)): + yaml["preBootstrapCommands"] = yaml.get("preBootstrapCommands", []) + yaml["preBootstrapCommands"] += [ + command.strip() for command in node_pool["preBootstrapCommands"].split("\n") if not _is_none_or_blank(command) + ] return yaml + def build_node_pool_taints_yaml(node_pool): - node_pool['taints'] = node_pool.get('taints', []) + node_pool["taints"] = node_pool.get("taints", []) yaml_taints = [] - if node_pool['taints']: - for taint in node_pool['taints']: - if not _is_none_or_blank(taint.get('key', None)): - yaml_taints.append({ - 'key': taint['key'], - 'value': taint.get('value', ''), - 'effect': taint.get('effect', 'NoSchedule') - }) + if node_pool["taints"]: + for taint in node_pool["taints"]: + if not _is_none_or_blank(taint.get("key", None)): + yaml_taints.append({"key": taint["key"], "value": taint.get("value", ""), "effect": taint.get("effect", "NoSchedule")}) else: - logging.error('A node pool taint is invalid, please ensure that the key to a taint is not empty. Observed taints: [%s]' % ';'.join(node_pool['taints'])) - raise Exception('A node pool taint is invalid, please ensure that the key to a taint is not empty. Observed taints: [%s]' % ';'.join(node_pool['taints'])) - return yaml_taints \ No newline at end of file + logging.error( + "A node pool taint is invalid, please ensure that the key to a taint is not empty. Observed taints: [%s]" + % ";".join(node_pool["taints"]) + ) + raise Exception( + "A node pool taint is invalid, please ensure that the key to a taint is not empty. Observed taints: [%s]" + % ";".join(node_pool["taints"]) + ) + return yaml_taints diff --git a/python-lib/dku_utils/taints.py b/python-lib/dku_utils/taints.py index 47fb07a..4a9cf8f 100644 --- a/python-lib/dku_utils/taints.py +++ b/python-lib/dku_utils/taints.py @@ -1,45 +1,50 @@ import json from dku_utils.access import _is_none_or_blank + class Taint(dict): def __init__(self, taint): - if not _is_none_or_blank(taint.get('key', None)): - self['key'] = taint.get('key', '') + if not _is_none_or_blank(taint.get("key", None)): + self["key"] = taint.get("key", "") - if not _is_none_or_blank(taint.get('value', None)): - self['value'] = taint.get('value', '') + if not _is_none_or_blank(taint.get("value", None)): + self["value"] = taint.get("value", "") - if not _is_none_or_blank(taint.get('effect', None)): - self['effect'] = taint.get('effect', '') + if not _is_none_or_blank(taint.get("effect", None)): + self["effect"] = taint.get("effect", "") def __eq__(self, other): - return self.get('key', '') == other.get('key', '') and self.get('value', '') == other.get('value', '') and self.get('effect', '') == other.get('effect', '') + return ( + self.get("key", "") == other.get("key", "") + and self.get("value", "") == other.get("value", "") + and self.get("effect", "") == other.get("effect", "") + ) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): - return hash((self.get('key', ''), self.get('value', ''), self.get('effect', ''))) + return hash((self.get("key", ""), self.get("value", ""), self.get("effect", ""))) + class Toleration(Taint): def __init__(self, taint): super(Toleration, self).__init__(taint) - if self.get('value', ''): - self['operator'] = 'Equal' + if self.get("value", ""): + self["operator"] = "Equal" else: - self['operator'] = 'Exists' + self["operator"] = "Exists" def __eq__(self, other): - return super(Toleration, self).__eq__(other) and self.get('operator', '') == other.get('operator', '') - + return super(Toleration, self).__eq__(other) and self.get("operator", "") == other.get("operator", "") def __hash__(self): - return hash((super(Toleration, self).__hash__(), self.get('operator', ''))) + return hash((super(Toleration, self).__hash__(), self.get("operator", ""))) def to_dict(self): return {k: v for k, v in self.items()} - + @staticmethod def from_json(tolerations_json): return [Toleration(tol) for tol in json.loads(tolerations_json)] @@ -50,4 +55,4 @@ def from_dict(raw_dicts): @staticmethod def to_list(tolerations): - return [toleration.to_dict() for toleration in tolerations] \ No newline at end of file + return [toleration.to_dict() for toleration in tolerations] diff --git a/python-lib/dku_utils/tools_version.py b/python-lib/dku_utils/tools_version.py index 67d5eb1..b1b802c 100644 --- a/python-lib/dku_utils/tools_version.py +++ b/python-lib/dku_utils/tools_version.py @@ -1,20 +1,22 @@ -import json, re -from dku_kube.kubectl_command import run_with_timeout, KubeCommandException -from dku_aws.eksctl_command import EksctlCommand -from dku_utils.cluster import get_connection_info +import json +import re +from dku_kube.kubectl_command import run_with_timeout + def get_kubectl_version(): - cmd = ['kubectl', 'version', '--client', '-o', 'json'] + cmd = ["kubectl", "version", "--client", "-o", "json"] out, err = run_with_timeout(cmd) - return json.loads(out)['clientVersion'] + return json.loads(out)["clientVersion"] + def kubectl_version_to_string(kubectl_version): """ Writes as a string the Kubernetes version coming from outcome of `kubectl version` command """ - major = str(kubectl_version['major']) if 'major' in kubectl_version else '' - minor = str(kubectl_version['minor']) if 'minor' in kubectl_version else '' - return major + '.' + minor + major = str(kubectl_version["major"]) if "major" in kubectl_version else "" + minor = str(kubectl_version["minor"]) if "minor" in kubectl_version else "" + return major + "." + minor + def get_kubectl_version_int(kubectl_version): """ @@ -23,14 +25,15 @@ def get_kubectl_version_int(kubectl_version): """ # the kubectl version downloaded from Amazon website has a minor version finishing by '+' # keeping only the first numeric sequence for the minor version - if 'major' not in kubectl_version or 'minor' not in kubectl_version: + if "major" not in kubectl_version or "minor" not in kubectl_version: raise Exception("Kubectl version found on the machine: %s. It is not correctly formatted" % kubectl_version_to_string(kubectl_version)) regex_minor_int = re.compile("^[^0-9]*([0-9]+)([^0-9].*$|$)") - search_results_minor_int = re.search(regex_minor_int, kubectl_version['minor']) + search_results_minor_int = re.search(regex_minor_int, kubectl_version["minor"]) if not search_results_minor_int or not search_results_minor_int.groups(): raise Exception("Kubectl version found on the machine: %s. It was not possible to parse" % kubectl_version_to_string(kubectl_version)) minor_int = int(search_results_minor_int.groups()[0]) - return int(kubectl_version['major']), minor_int + return int(kubectl_version["major"]), minor_int + def strip_kubernetes_version(k8s_version_input): """ @@ -42,10 +45,12 @@ def strip_kubernetes_version(k8s_version_input): raise Exception("Kubectl version specified: %s. No valid Kubernetes version found", k8s_version_input) return search_results_k8s_version.groups()[0] + def get_authenticator_version(): - cmd = ['aws-iam-authenticator', 'version', '-o', 'json'] + cmd = ["aws-iam-authenticator", "version", "-o", "json"] out, err = run_with_timeout(cmd) - return json.loads(out)['Version'].lstrip('v') + return json.loads(out)["Version"].lstrip("v") + def kubectl_should_use_beta_apiVersion(kubectl_version): version_int = get_kubectl_version_int(kubectl_version) @@ -53,9 +58,12 @@ def kubectl_should_use_beta_apiVersion(kubectl_version): minor = version_int[1] return major > 1 or (major == 1 and minor > 23) # v1alpha1 was deprecated in 1.24 + def check_versions(): kubectl_version = get_kubectl_version() authenticator_version = get_authenticator_version() - if kubectl_should_use_beta_apiVersion(kubectl_version) and authenticator_version < '0.5.4': - raise Exception('Found kubectl %s and aws-iam-authenticator %s, which are incompatible. Please upgrade aws-iam-authenticator.' - % (kubectl_version['major']+'.'+(kubectl_version['minor']), authenticator_version)) + if kubectl_should_use_beta_apiVersion(kubectl_version) and authenticator_version < "0.5.4": + raise Exception( + "Found kubectl %s and aws-iam-authenticator %s, which are incompatible. Please upgrade aws-iam-authenticator." + % (kubectl_version["major"] + "." + (kubectl_version["minor"]), authenticator_version) + ) diff --git a/python-runnables/add-autoscaler/runnable.py b/python-runnables/add-autoscaler/runnable.py index fbf3298..bb5f3d0 100644 --- a/python-runnables/add-autoscaler/runnable.py +++ b/python-runnables/add-autoscaler/runnable.py @@ -1,20 +1,19 @@ from dataiku.runnables import Runnable -import dataiku -import os, json, logging from dku_kube.autoscaler import add_autoscaler_if_needed, has_autoscaler from dku_utils.cluster import get_cluster_from_dss_cluster + class MyRunnable(Runnable): def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config self.plugin_config = plugin_config - + def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config["clusterId"]) # retrieve the actual name in the cluster's data if cluster_data is None: @@ -25,10 +24,10 @@ def run(self, progress_callback): cluster_id = cluster_def["Name"] # the cluster is accessible via the kubeconfig - kube_config_path = dss_cluster_settings.get_raw()['containerSettings']['executionConfigsGenericOverrides']['kubeConfigPath'] - + kube_config_path = dss_cluster_settings.get_raw()["containerSettings"]["executionConfigsGenericOverrides"]["kubeConfigPath"] + if has_autoscaler(kube_config_path): - return '
An autoscaler pod already runs
' + return "
An autoscaler pod already runs
" else: add_autoscaler_if_needed(cluster_id, self.config, cluster_def, kube_config_path, []) - return '
Created an autoscaler pod
' + return "
Created an autoscaler pod
" diff --git a/python-runnables/add-node-pool/runnable.py b/python-runnables/add-node-pool/runnable.py index a798805..140ef9a 100644 --- a/python-runnables/add-node-pool/runnable.py +++ b/python-runnables/add-node-pool/runnable.py @@ -1,26 +1,27 @@ from dataiku.runnables import Runnable -import dataiku -import os, json, logging, yaml +import os +import logging +import yaml from dku_kube.autoscaler import add_autoscaler_if_needed from dku_kube.gpu_driver import add_gpu_driver_if_needed from dku_aws.eksctl_command import EksctlCommand -from dku_aws.aws_command import AwsCommand from dku_utils.cluster import get_cluster_from_dss_cluster, get_connection_info from dku_utils.config_parser import get_security_groups_arg, get_region_arg from dku_utils.node_pool import get_node_pool_args, build_node_pool_taints_yaml from dku_utils.access import _is_none_or_blank + class MyRunnable(Runnable): def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config self.plugin_config = plugin_config - + def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config["clusterId"]) # retrieve the actual name in the cluster's data if cluster_data is None: @@ -31,29 +32,29 @@ def run(self, progress_callback): cluster_id = cluster_def["Name"] # the cluster is accessible via the kubeconfig - kube_config_path = dss_cluster_settings.get_raw()['containerSettings']['executionConfigsGenericOverrides']['kubeConfigPath'] + kube_config_path = dss_cluster_settings.get_raw()["containerSettings"]["executionConfigsGenericOverrides"]["kubeConfigPath"] + + connection_info = get_connection_info(dss_cluster_config.get("config")) - connection_info = get_connection_info(dss_cluster_config.get('config')) + node_pool = self.config.get("nodePool", {}) + node_group_id = node_pool.get("nodeGroupId", None) - node_pool = self.config.get('nodePool', {}) - node_group_id = node_pool.get('nodeGroupId', None) - # first pass: get the yaml config corresponding to the command line args - args = ['create', 'nodegroup'] - args = args + ['-v', '3'] # not -v 4 otherwise there is a debug line in the beginning of the output - args = args + ['--cluster', cluster_id] + args = ["create", "nodegroup"] + args = args + ["-v", "3"] # not -v 4 otherwise there is a debug line in the beginning of the output + args = args + ["--cluster", cluster_id] if node_group_id is not None and len(node_group_id) > 0: - args = args + ['--name', node_group_id] - + args = args + ["--name", node_group_id] + args = args + get_region_arg(connection_info) - - if dss_cluster_config['config'].get('useEcr', False): - args = args + ['--full-ecr-access'] - - if dss_cluster_config.get('privateNetworking', False) or self.config.get('privateNetworking', None): - args = args + ['--node-private-networking'] - - args += get_security_groups_arg(dss_cluster_config['config'].get('networkingSettings', {})) + + if dss_cluster_config["config"].get("useEcr", False): + args = args + ["--full-ecr-access"] + + if dss_cluster_config.get("privateNetworking", False) or self.config.get("privateNetworking", None): + args = args + ["--node-private-networking"] + + args += get_security_groups_arg(dss_cluster_config["config"].get("networkingSettings", {})) args += get_node_pool_args(node_pool) @@ -62,58 +63,59 @@ def run(self, progress_callback): logging.info("Got spec:\n%s" % yaml_spec) yaml_dict = yaml.safe_load(yaml_spec) - - # second step: add the stuff that has no equivalent command line arg, and run the + + # second step: add the stuff that has no equivalent command line arg, and run the # eksctl command on the yaml config - if node_pool.get('addPreBootstrapCommands', False) and not _is_none_or_blank(node_pool.get("preBootstrapCommands", None)): + if node_pool.get("addPreBootstrapCommands", False) and not _is_none_or_blank(node_pool.get("preBootstrapCommands", None)): # has to be added in the yaml, there is no command line flag for that commands = node_pool.get("preBootstrapCommands", "") - for node_pool_dict in yaml_dict['managedNodeGroups']: - if node_pool_dict.get('preBootstrapCommands') is None: - node_pool_dict['preBootstrapCommands'] = [] - for command in commands.split('\n'): + for node_pool_dict in yaml_dict["managedNodeGroups"]: + if node_pool_dict.get("preBootstrapCommands") is None: + node_pool_dict["preBootstrapCommands"] = [] + for command in commands.split("\n"): if len(command.strip()) > 0: - node_pool_dict['preBootstrapCommands'].append(command) + node_pool_dict["preBootstrapCommands"].append(command) # Adding node pool taints on the only node pool we create which is managed: node_group_taints = build_node_pool_taints_yaml(node_pool) - yaml_dict['managedNodeGroups'][0]['taints'] = node_group_taints + yaml_dict["managedNodeGroups"][0]["taints"] = node_group_taints # Adding propagateASGTags to the node group if it is autoscaled. - # This propagates the labels/taints of the node group to the autoscaling group so that new nodes can be properly configured on creation (scaling up) - if node_pool.get('numNodesAutoscaling', False): - yaml_dict['managedNodeGroups'][0]['propagateASGTags'] = True + # This propagates the labels/taints of the node group to the autoscaling group + # so that new nodes can be properly configured on creation (scaling up) + if node_pool.get("numNodesAutoscaling", False): + yaml_dict["managedNodeGroups"][0]["propagateASGTags"] = True - yaml_loc = os.path.join(os.getcwd(), cluster_id +'_config.yaml') - with open(yaml_loc, 'w') as outfile: + yaml_loc = os.path.join(os.getcwd(), cluster_id + "_config.yaml") + with open(yaml_loc, "w") as outfile: yaml.dump(yaml_dict, outfile, default_flow_style=False) logging.info("Final spec\n%s" % yaml.dump(yaml_dict)) - args = ['create', 'nodegroup'] - args = args + ['-v', '4'] - args = args + ['-f', yaml_loc] + args = ["create", "nodegroup"] + args = args + ["-v", "4"] + args = args + ["-f", yaml_loc] c = EksctlCommand(args, connection_info) if c.run_and_log() != 0: raise Exception("Failed to add nodegroup") - - if node_pool.get('numNodesAutoscaling', False): + + if node_pool.get("numNodesAutoscaling", False): logging.info("Nodegroup is autoscaling, ensuring autoscaler") add_autoscaler_if_needed(cluster_id, self.config, cluster_data.get("cluster"), kube_config_path, node_group_taints) - - if node_pool.get('enableGPU', False): + + if node_pool.get("enableGPU", False): logging.info("Nodegroup is GPU-enabled, ensuring NVIDIA GPU Drivers") - add_gpu_driver_if_needed(self.config['clusterId'], kube_config_path, connection_info, node_group_taints) + add_gpu_driver_if_needed(self.config["clusterId"], kube_config_path, connection_info, node_group_taints) - args = ['get', 'nodegroup'] - #args = args + ['-v', '4'] - args = args + ['--cluster', cluster_id] + args = ["get", "nodegroup"] + # args = args + ['-v', '4'] + args = args + ["--cluster", cluster_id] args = args + get_region_arg(connection_info) - args = args + ['-o', 'json'] + args = args + ["-o", "json"] c = EksctlCommand(args, connection_info) node_groups_str = c.run_and_get_output() - + return '
Nodegroup added
%s
' % node_groups_str diff --git a/python-runnables/inspect-node-pools/runnable.py b/python-runnables/inspect-node-pools/runnable.py index c48401b..fb1293b 100644 --- a/python-runnables/inspect-node-pools/runnable.py +++ b/python-runnables/inspect-node-pools/runnable.py @@ -1,6 +1,5 @@ from dataiku.runnables import Runnable -import dataiku -import json, logging, os +import json from dku_aws.eksctl_command import EksctlCommand from dku_aws.aws_command import AwsCommand from dku_utils.cluster import get_cluster_from_dss_cluster, get_connection_info @@ -12,12 +11,12 @@ def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config self.plugin_config = plugin_config - + def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config["clusterId"]) # retrieve the actual name in the cluster's data if cluster_data is None: @@ -26,53 +25,69 @@ def run(self, progress_callback): if cluster_def is None: raise Exception("No cluster definition (starting failed?)") cluster_id = cluster_def["Name"] - - connection_info = get_connection_info(dss_cluster_config.get('config')) - - node_group_id = self.config.get('nodeGroupId', None) + + connection_info = get_connection_info(dss_cluster_config.get("config")) + + node_group_id = self.config.get("nodeGroupId", None) # There's a bug in `eksctl` where the `StackName` doesn't appear in the output # if `get nodegroup` is called with a single node group name and that node # group is "managed". So, always get all node groups, even if only one is # specified in the params and even though we don't use the `StackName` anymore. - args = ['get', 'nodegroup'] - args = args + ['--cluster', cluster_id] + args = ["get", "nodegroup"] + args = args + ["--cluster", cluster_id] args = args + get_region_arg(connection_info) - args = args + ['-o', 'json'] + args = args + ["-o", "json"] c = EksctlCommand(args, connection_info) node_groups = json.loads(c.run_and_get_output()) if node_group_id and len(node_group_id) != 0: - node_groups = list(filter(lambda node_group: node_group.get('Name') == node_group_id, node_groups)) + node_groups = list(filter(lambda node_group: node_group.get("Name") == node_group_id, node_groups)) if len(node_groups) == 0: return '
%s
Unable to get details
' % (node_group_id) node_group_outputs = [] for node_group in node_groups: - node_group_id = node_group.get('Name') - node_group_auto_scaling_id = node_group.get('AutoScalingGroupName') + node_group_id = node_group.get("Name") + node_group_auto_scaling_id = node_group.get("AutoScalingGroupName") if node_group_auto_scaling_id is None: - node_group_outputs.append('
%s
Unable to get auto-scaling group
%s
' % (node_group_id, json.dumps(node_group, indent=2))) + node_group_outputs.append( + '
%s
Unable to get auto-scaling group
%s
' + % (node_group_id, json.dumps(node_group, indent=2)) + ) continue - args = ['autoscaling', 'describe-auto-scaling-groups'] - args = args + ['--auto-scaling-group-names', node_group_auto_scaling_id] + args = ["autoscaling", "describe-auto-scaling-groups"] + args = args + ["--auto-scaling-group-names", node_group_auto_scaling_id] c = AwsCommand(args, connection_info) - auto_scaling_resources = json.loads(c.run_and_get_output()).get('AutoScalingGroups', []) - + auto_scaling_resources = json.loads(c.run_and_get_output()).get("AutoScalingGroups", []) + if len(auto_scaling_resources) == 0: - node_group_outputs.append('
%s
Unable to get auto-scaling group\'s resources
%s
' % (node_group_id, json.dumps(node_group, indent=2))) + node_group_outputs.append( + '
%s
Unable to get auto-scaling group\'s resources
%s
' + % (node_group_id, json.dumps(node_group, indent=2)) + ) continue - + auto_scaling_resource = auto_scaling_resources[0] - - min_instances = auto_scaling_resource.get('MinSize','') - cur_instances = len(auto_scaling_resource.get('Instances',[])) - max_instances = auto_scaling_resource.get('MaxSize','') - node_group_outputs.append('
%s
%s
Min=%s, current=%s, max=%s
%s
' % (node_group_id, json.dumps(node_group, indent=2), min_instances, cur_instances, max_instances, json.dumps(auto_scaling_resource.get('Instances', []), indent=2))) - - return '
%s
' % ''.join(node_group_outputs) \ No newline at end of file + + min_instances = auto_scaling_resource.get("MinSize", "") + cur_instances = len(auto_scaling_resource.get("Instances", [])) + max_instances = auto_scaling_resource.get("MaxSize", "") + node_group_outputs.append( + '
%s
%s
Min=%s, current=%s, max=%s
%s
' + % ( + node_group_id, + json.dumps(node_group, indent=2), + min_instances, + cur_instances, + max_instances, + json.dumps(auto_scaling_resource.get("Instances", []), indent=2), + ) + ) + + return "
%s
" % "".join(node_group_outputs) diff --git a/python-runnables/install-alb-controller/runnable.py b/python-runnables/install-alb-controller/runnable.py index dc03753..bc345b8 100644 --- a/python-runnables/install-alb-controller/runnable.py +++ b/python-runnables/install-alb-controller/runnable.py @@ -1,7 +1,9 @@ from dataiku.runnables import Runnable -import dataiku -import json, logging, os, re, tempfile, time -import requests +import json +import logging +import os +import re +import requests from dku_aws.eksctl_command import EksctlCommand from dku_aws.aws_command import AwsCommand from dku_utils.cluster import get_cluster_from_dss_cluster, get_cluster_generic_property, set_cluster_generic_property, get_connection_info @@ -9,29 +11,38 @@ from dku_utils.access import _is_none_or_blank from dku_utils.config_parser import get_region_arg + def make_html(command_outputs): divs = [] for command_output in command_outputs: - cmd_html = '
Run: %s
' % json.dumps(command_output[0]) - rv_html = '
Returned %s
' % command_output[1] - out_html = '
Output
%s
' % command_output[2] - err_html = '
Error
%s
' % command_output[3] + cmd_html = "
Run: %s
" % json.dumps(command_output[0]) + rv_html = "
Returned %s
" % command_output[1] + out_html = ( + '
Output
%s
' + % command_output[2] + ) + err_html = ( + '
Error
%s
' + % command_output[3] + ) divs.append(cmd_html) divs.append(rv_html) divs.append(out_html) if command_output[1] != 0 and not _is_none_or_blank(command_output[3]): divs.append(err_html) - html = '\n'.join(divs) + html = "\n".join(divs) try: - html.decode('utf8') + html.decode("utf8") except (UnicodeDecodeError, AttributeError): pass return html + class InstallAlb(Runnable): """ Installs a ALB ingress controller as described in https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html """ + def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config @@ -41,9 +52,9 @@ def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config["clusterId"]) - if get_cluster_generic_property(dss_cluster_settings, 'alb-ingress.controller', 'false') == 'true': + if get_cluster_generic_property(dss_cluster_settings, "alb-ingress.controller", "false") == "true": raise Exception("ALB controller already installed, remove first") # retrieve the actual name in the cluster's data @@ -53,65 +64,70 @@ def run(self, progress_callback): if cluster_def is None: raise Exception("No cluster definition (starting failed?)") cluster_id = cluster_def["Name"] - kube_config_path = dss_cluster_settings.get_raw()['containerSettings']['executionConfigsGenericOverrides']['kubeConfigPath'] - connection_info = get_connection_info(dss_cluster_config.get('config')) - + kube_config_path = dss_cluster_settings.get_raw()["containerSettings"]["executionConfigsGenericOverrides"]["kubeConfigPath"] + connection_info = get_connection_info(dss_cluster_config.get("config")) + env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path + env["KUBECONFIG"] = kube_config_path command_outputs = [] keep_going = True - + # setup iam stuff in eksctl - args = ['utils', 'associate-iam-oidc-provider', '--approve'] - #args = args + ['-v', '4'] - args = args + ['--cluster', cluster_id] + args = ["utils", "associate-iam-oidc-provider", "--approve"] + # args = args + ['-v', '4'] + args = args + ["--cluster", cluster_id] args = args + get_region_arg(connection_info) c = EksctlCommand(args, connection_info) command_outputs.append(c.run()) if command_outputs[-1][1] != 0: return make_html(command_outputs) - + # checking if we need to create the policy - policy_name = self.config.get('policyName', 'ALBIngressControllerIAMPolicy') - - args = ['iam', 'list-policies'] + policy_name = self.config.get("policyName", "ALBIngressControllerIAMPolicy") + + args = ["iam", "list-policies"] args = args + get_region_arg(connection_info) c = AwsCommand(args, connection_info) command_outputs.append(c.run()) if command_outputs[-1][1] != 0: return make_html(command_outputs) - + policy_arn = None - for policy in json.loads(command_outputs[-1][2])['Policies']: - if policy.get('PolicyName', None) == policy_name: - policy_arn = policy.get('Arn', None) + for policy in json.loads(command_outputs[-1][2])["Policies"]: + if policy.get("PolicyName", None) == policy_name: + policy_arn = policy.get("Arn", None) if policy_arn is None: if not self.config.get("createPolicy", False): raise Exception("Policy %s doesn't exist and the macro isn't allowed to create it" % policy_name) # create the policy - policy_document_url = 'https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.8/docs/examples/iam-policy.json' + policy_document_url = "https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.8/docs/examples/iam-policy.json" policy_document = requests.get(policy_document_url).text with open("policy.json", "w") as p: p.write(policy_document) - - args = ['iam', 'create-policy'] - args = args + ['--policy-name', policy_name] - args = args + ['--policy-document', 'file://policy.json'] + + args = ["iam", "create-policy"] + args = args + ["--policy-name", policy_name] + args = args + ["--policy-document", "file://policy.json"] args = args + get_region_arg(connection_info) c = AwsCommand(args, connection_info) command_outputs.append(c.run()) if command_outputs[-1][1] != 0: return make_html(command_outputs) - - policy_arn = json.loads(command_outputs[-1][2])['Policy'].get('Arn', None) + + policy_arn = json.loads(command_outputs[-1][2])["Policy"].get("Arn", None) # create the role on the cluster - cmd = ['kubectl', 'apply', '-f', 'https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/rbac-role.yaml'] + cmd = [ + "kubectl", + "apply", + "-f", + "https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/rbac-role.yaml", + ] logging.info("Run : %s" % json.dumps(cmd)) try: out, err = run_with_timeout(cmd, env=env, timeout=100) @@ -124,13 +140,13 @@ def run(self, progress_callback): return make_html(command_outputs) # attach the role to the policy - - args = ['create', 'iamserviceaccount', '--override-existing-serviceaccounts', '--approve'] - #args = args + ['-v', '4'] - args = args + ['--name', 'alb-ingress-controller'] # that's the name in the rbac-role.yaml - args = args + ['--namespace', 'kube-system'] # that's the name in the rbac-role.yaml - args = args + ['--cluster', cluster_id] - args = args + ['--attach-policy-arn', policy_arn] + + args = ["create", "iamserviceaccount", "--override-existing-serviceaccounts", "--approve"] + # args = args + ['-v', '4'] + args = args + ["--name", "alb-ingress-controller"] # that's the name in the rbac-role.yaml + args = args + ["--namespace", "kube-system"] # that's the name in the rbac-role.yaml + args = args + ["--cluster", cluster_id] + args = args + ["--attach-policy-arn", policy_arn] args = args + get_region_arg(connection_info) c = EksctlCommand(args, connection_info) @@ -138,17 +154,19 @@ def run(self, progress_callback): if command_outputs[-1][1] != 0: return make_html(command_outputs) - r = requests.get('https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/alb-ingress-controller.yaml') + r = requests.get( + "https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/alb-ingress-controller.yaml" + ) service_data = r.text - cluster_flag_pattern = '#.*cluster\\-name=.*' - cluster_flag_replacement = '- --cluster-name=%s' % cluster_id + cluster_flag_pattern = "#.*cluster\\-name=.*" + cluster_flag_replacement = "- --cluster-name=%s" % cluster_id service_data = re.sub(cluster_flag_pattern, cluster_flag_replacement, service_data) - + print(service_data) - with open('./alb-ingress-controller.yaml', 'w') as f: + with open("./alb-ingress-controller.yaml", "w") as f: f.write(service_data) - - cmd = ['kubectl', 'apply', '-f', './alb-ingress-controller.yaml'] + + cmd = ["kubectl", "apply", "-f", "./alb-ingress-controller.yaml"] logging.info("Run : %s" % json.dumps(cmd)) try: out, err = run_with_timeout(cmd, env=env, timeout=100) @@ -161,15 +179,15 @@ def run(self, progress_callback): return make_html(command_outputs) if self.config.get("tagSubnets", False): - networking_settings = dss_cluster_config.get('config', {}).get('networkingSettings', {}) - subnets = networking_settings.get('subnets', []) - if networking_settings.get('privateNetworking', False): - private_subnets = dss_cluster_config.get('config', {}).get('networkingSettings', {}).get('privateSubnets', []) + networking_settings = dss_cluster_config.get("config", {}).get("networkingSettings", {}) + subnets = networking_settings.get("subnets", []) + if networking_settings.get("privateNetworking", False): + private_subnets = dss_cluster_config.get("config", {}).get("networkingSettings", {}).get("privateSubnets", []) else: private_subnets = [] - + def add_tags(resources, tag, connection_info, command_outputs): - args = ['ec2', 'create-tags'] + args = ["ec2", "create-tags"] args = args + get_region_arg(connection_info) args = args + ["--resources"] + resources args = args + ["--tags", tag] @@ -178,12 +196,12 @@ def add_tags(resources, tag, connection_info, command_outputs): command_outputs.append(c.run()) if command_outputs[-1][1] != 0: return make_html(command_outputs) - + if len(subnets) > 0: - add_tags(subnets, 'Key=kubernetes.io/role/elb,Value=1', connection_info, command_outputs) + add_tags(subnets, "Key=kubernetes.io/role/elb,Value=1", connection_info, command_outputs) if len(private_subnets) > 0: - add_tags(private_subnets, 'Key=kubernetes.io/role/internal-elb,Value=1', connection_info, command_outputs) - - set_cluster_generic_property(dss_cluster_settings, 'alb-ingress.controller', 'true', True) + add_tags(private_subnets, "Key=kubernetes.io/role/internal-elb,Value=1", connection_info, command_outputs) + + set_cluster_generic_property(dss_cluster_settings, "alb-ingress.controller", "true", True) - return make_html(command_outputs) \ No newline at end of file + return make_html(command_outputs) diff --git a/python-runnables/remove-alb-controller/runnable.py b/python-runnables/remove-alb-controller/runnable.py index 2a82477..9b51c17 100644 --- a/python-runnables/remove-alb-controller/runnable.py +++ b/python-runnables/remove-alb-controller/runnable.py @@ -1,37 +1,45 @@ from dataiku.runnables import Runnable -import dataiku -import json, logging, os, re, tempfile, time -import requests +import json +import logging +import os from dku_aws.eksctl_command import EksctlCommand -from dku_aws.aws_command import AwsCommand from dku_utils.cluster import get_cluster_from_dss_cluster, get_cluster_generic_property, set_cluster_generic_property, get_connection_info from dku_kube.kubectl_command import run_with_timeout, KubeCommandException from dku_utils.access import _is_none_or_blank from dku_utils.config_parser import get_region_arg + def make_html(command_outputs): divs = [] for command_output in command_outputs: - cmd_html = '
Run: %s
' % json.dumps(command_output[0]) - rv_html = '
Returned %s
' % command_output[1] - out_html = '
Output
%s
' % command_output[2] - err_html = '
Error
%s
' % command_output[3] + cmd_html = "
Run: %s
" % json.dumps(command_output[0]) + rv_html = "
Returned %s
" % command_output[1] + out_html = ( + '
Output
%s
' + % command_output[2] + ) + err_html = ( + '
Error
%s
' + % command_output[3] + ) divs.append(cmd_html) divs.append(rv_html) divs.append(out_html) if command_output[1] != 0 and not _is_none_or_blank(command_output[3]): divs.append(err_html) - html = '\n'.join(divs) + html = "\n".join(divs) try: - html.decode('utf8') + html.decode("utf8") except (UnicodeDecodeError, AttributeError): pass return html + class RemoveAlb(Runnable): """ Installs a ALB ingress controller as described in https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html """ + def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config @@ -41,9 +49,9 @@ def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config["clusterId"]) - if get_cluster_generic_property(dss_cluster_settings, 'alb-ingress.controller', 'false') != 'true': + if get_cluster_generic_property(dss_cluster_settings, "alb-ingress.controller", "false") != "true": raise Exception("ALB controller not installed (or not by the installation macro)") # retrieve the actual name in the cluster's data @@ -53,17 +61,22 @@ def run(self, progress_callback): if cluster_def is None: raise Exception("No cluster definition (starting failed?)") cluster_id = cluster_def["Name"] - kube_config_path = dss_cluster_settings.get_raw()['containerSettings']['executionConfigsGenericOverrides']['kubeConfigPath'] - connection_info = get_connection_info(dss_cluster_config.get('config')) + kube_config_path = dss_cluster_settings.get_raw()["containerSettings"]["executionConfigsGenericOverrides"]["kubeConfigPath"] + connection_info = get_connection_info(dss_cluster_config.get("config")) env = os.environ.copy() - env['KUBECONFIG'] = kube_config_path + env["KUBECONFIG"] = kube_config_path command_outputs = [] keep_going = True - + # delete the controller - cmd = ['kubectl', 'delete', '-f', 'https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/alb-ingress-controller.yaml'] + cmd = [ + "kubectl", + "delete", + "-f", + "https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/alb-ingress-controller.yaml", + ] logging.info("Run : %s" % json.dumps(cmd)) try: out, err = run_with_timeout(cmd, env=env, timeout=100) @@ -76,20 +89,25 @@ def run(self, progress_callback): return make_html(command_outputs) # detach the role from the policy - args = ['delete', 'iamserviceaccount'] - #args = args + ['-v', '4'] - args = args + ['--name', 'alb-ingress-controller'] # that's the name in the rbac-role.yaml - args = args + ['--namespace', 'kube-system'] # that's the name in the rbac-role.yaml - args = args + ['--cluster', cluster_id] + args = ["delete", "iamserviceaccount"] + # args = args + ['-v', '4'] + args = args + ["--name", "alb-ingress-controller"] # that's the name in the rbac-role.yaml + args = args + ["--namespace", "kube-system"] # that's the name in the rbac-role.yaml + args = args + ["--cluster", cluster_id] args = args + get_region_arg(connection_info) c = EksctlCommand(args, connection_info) command_outputs.append(c.run()) if command_outputs[-1][1] != 0: return make_html(command_outputs) - + # delete the role on the cluster - cmd = ['kubectl', 'delete', '-f', 'https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/rbac-role.yaml'] + cmd = [ + "kubectl", + "delete", + "-f", + "https://raw.githubusercontent.com/kubernetes-sigs/aws-alb-ingress-controller/v1.1.4/docs/examples/rbac-role.yaml", + ] logging.info("Run : %s" % json.dumps(cmd)) try: out, err = run_with_timeout(cmd, env=env, timeout=100) @@ -97,6 +115,6 @@ def run(self, progress_callback): except KubeCommandException as e: command_outputs.append((cmd, e.rv, e.out, e.err)) - set_cluster_generic_property(dss_cluster_settings, 'alb-ingress.controller', 'false', True) + set_cluster_generic_property(dss_cluster_settings, "alb-ingress.controller", "false", True) - return make_html(command_outputs) \ No newline at end of file + return make_html(command_outputs) diff --git a/python-runnables/resize-node-pool/runnable.py b/python-runnables/resize-node-pool/runnable.py index 78f5cbb..56a9e73 100644 --- a/python-runnables/resize-node-pool/runnable.py +++ b/python-runnables/resize-node-pool/runnable.py @@ -1,22 +1,22 @@ from dataiku.runnables import Runnable -import dataiku -import os, json, logging +import json +import logging from dku_aws.eksctl_command import EksctlCommand -from dku_aws.aws_command import AwsCommand from dku_utils.cluster import get_cluster_from_dss_cluster, get_connection_info from dku_utils.config_parser import get_region_arg + class MyRunnable(Runnable): def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config self.plugin_config = plugin_config - + def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, dss_cluster_config = get_cluster_from_dss_cluster(self.config["clusterId"]) # retrieve the actual name in the cluster's data if cluster_data is None: @@ -26,47 +26,48 @@ def run(self, progress_callback): raise Exception("No cluster definition (starting failed?)") cluster_id = cluster_def["Name"] - connection_info = get_connection_info(dss_cluster_config.get('config')) - - node_group_id = self.config.get('nodeGroupId', None) + connection_info = get_connection_info(dss_cluster_config.get("config")) + + node_group_id = self.config.get("nodeGroupId", None) if node_group_id is None or len(node_group_id) == 0: - args = ['get', 'nodegroup'] - #args = args + ['-v', '4'] - args = args + ['--cluster', cluster_id] + args = ["get", "nodegroup"] + # args = args + ['-v', '4'] + args = args + ["--cluster", cluster_id] args = args + get_region_arg(connection_info) - args = args + ['-o', 'json'] + args = args + ["-o", "json"] c = EksctlCommand(args, connection_info) node_groups = json.loads(c.run_and_get_output()) - node_group_ids = [node_group['Name'] for node_group in node_groups] + node_group_ids = [node_group["Name"] for node_group in node_groups] if len(node_group_ids) != 1: - raise Exception("Cluster has %s node groups, cannot resize. Specify a node group explicitely among %s" % (len(node_group_ids), json.dumps(node_group_ids))) + raise Exception( + "Cluster has %s node groups, cannot resize. Specify a node group explicitely among %s" + % (len(node_group_ids), json.dumps(node_group_ids)) + ) node_group_id = node_group_ids[0] - args = ['get', 'nodegroup'] - #args = args + ['-v', '4'] - args = args + ['--cluster', cluster_id] - args = args + ['--name', node_group_id] + args = ["get", "nodegroup"] + # args = args + ['-v', '4'] + args = args + ["--cluster", cluster_id] + args = args + ["--name", node_group_id] args = args + get_region_arg(connection_info) - args = args + ['-o', 'json'] + args = args + ["-o", "json"] c = EksctlCommand(args, connection_info) node_group_batch = json.loads(c.run_and_get_output()) if len(node_group_batch) == 0: raise Exception("Unable to retrieve info of node group %s" % node_group_id) - node_group = node_group_batch[0] - - desired_count = self.config['numNodes'] + desired_count = self.config["numNodes"] logging.info("Resize to %s" % desired_count) if desired_count == 0: - args = ['delete', 'nodegroup'] - args = args + ['-v', '4'] - args = args + ['--cluster', cluster_id] - args = args + ['--name', node_group_id] + args = ["delete", "nodegroup"] + args = args + ["-v", "4"] + args = args + ["--cluster", cluster_id] + args = args + ["--name", node_group_id] args = args + get_region_arg(connection_info) - if self.config.get('wait', False): - args = args + ['--wait'] # wait until resources are completely deleted + if self.config.get("wait", False): + args = args + ["--wait"] # wait until resources are completely deleted c = EksctlCommand(args, connection_info) rv, out, err = c.run_and_get() @@ -76,22 +77,22 @@ def run(self, progress_callback): else: logging.info("Cluster node group failed to delete") return '
Failed to delete the node group
%s
' % (err) - + else: - args = ['scale', 'nodegroup'] - args = args + ['-v', '4'] - args = args + ['--cluster', cluster_id] - args = args + ['--name', node_group_id] - args = args + ['--nodes', str(desired_count)] - desired_min_count = self.config.get('minNumNodes', -1) - desired_max_count = self.config.get('maxNumNodes', -1) + args = ["scale", "nodegroup"] + args = args + ["-v", "4"] + args = args + ["--cluster", cluster_id] + args = args + ["--name", node_group_id] + args = args + ["--nodes", str(desired_count)] + desired_min_count = self.config.get("minNumNodes", -1) + desired_max_count = self.config.get("maxNumNodes", -1) if desired_min_count > 0: - args = args + ['--nodes-min', str(desired_min_count)] + args = args + ["--nodes-min", str(desired_min_count)] if desired_max_count > 0: - args = args + ['--nodes-max', str(desired_max_count)] + args = args + ["--nodes-max", str(desired_max_count)] args = args + get_region_arg(connection_info) - if self.config.get('wait', False): - args = args + ['--wait'] # wait until resources are completely scaled + if self.config.get("wait", False): + args = args + ["--wait"] # wait until resources are completely scaled c = EksctlCommand(args, connection_info) rv, out, err = c.run_and_get() @@ -101,6 +102,3 @@ def run(self, progress_callback): else: logging.info("Cluster node group failed to resize") return '
Failed to resize the node group
%s
' % (err) - - - diff --git a/python-runnables/test-network/runnable.py b/python-runnables/test-network/runnable.py index 743f33c..e370d80 100644 --- a/python-runnables/test-network/runnable.py +++ b/python-runnables/test-network/runnable.py @@ -1,25 +1,30 @@ from dataiku.runnables import Runnable -import os, sys, json, yaml, random, subprocess, socket, re, traceback, ipaddress +import os +import json +import socket +import re +import traceback +import ipaddress from dku_kube.busybox_pod import BusyboxPod from dku_kube.kubectl_command import KubeCommandException from dku_utils.cluster import get_cluster_from_dss_cluster from six import text_type -class MyRunnable(Runnable): +class MyRunnable(Runnable): def __init__(self, project_key, config, plugin_config): self.project_key = project_key self.config = config self.plugin_config = plugin_config - + def get_progress_target(self): return None def run(self, progress_callback): - cluster_data, dss_cluster_settings, _ = get_cluster_from_dss_cluster(self.config['clusterId']) + cluster_data, dss_cluster_settings, _ = get_cluster_from_dss_cluster(self.config["clusterId"]) # the cluster is accessible via the kubeconfig - kube_config_path = dss_cluster_settings.get_raw()['containerSettings']['executionConfigsGenericOverrides']['kubeConfigPath'] + kube_config_path = dss_cluster_settings.get_raw()["containerSettings"]["executionConfigsGenericOverrides"]["kubeConfigPath"] # retrieve the actual name in the cluster's data if cluster_data is None: @@ -27,59 +32,75 @@ def run(self, progress_callback): cluster_def = cluster_data.get("cluster", None) if cluster_def is None: raise Exception("No cluster definition (starting failed?)") - - result = '' - - host = os.environ.get('DKU_BACKEND_EXT_HOST', socket.gethostname()) - port = os.environ['DKU_BACKEND_PORT'] - result = result + '
Checking connectivity to %s:%s from pod in cluster
' % (host, port) - + + result = "" + + host = os.environ.get("DKU_BACKEND_EXT_HOST", socket.gethostname()) + port = os.environ["DKU_BACKEND_PORT"] + result = result + "
Checking connectivity to %s:%s from pod in cluster
" % (host, port) + def add_to_result(result, op, cmd, out, err): - return result + '
%s
Command
%s
Output
%s
Error
%s
' % (op, json.dumps(cmd), out, err) + return ( + result + + '
%s
Command
%s
Output
%s
Error
%s
' # noqa E501 + % (op, json.dumps(cmd), out, err) + ) try: # sanity check - if host.startswith("127.0.0") or 'localhost' in host: - raise Exception('Host appears to not be a public hostname. Set DKU_BACKEND_EXT_HOST') + if host.startswith("127.0.0") or "localhost" in host: + raise Exception("Host appears to not be a public hostname. Set DKU_BACKEND_EXT_HOST") with BusyboxPod(kube_config_path) as b: try: ip = text_type(ipaddress.ip_address((host))) - result = result + '
Host %s is an ip. No need to resolve it, testing connection directly
' % (host) + result = result + "
Host %s is an ip. No need to resolve it, testing connection directly
" % (host) except ValueError: # check that the pod resolved the hostname ip = None - cmd = ['nslookup', host] + cmd = ["nslookup", host] out, err = b.exec_cmd(cmd) - result = result + '
Resolve host
Command
%s
Output
%s
Error
%s
' % (json.dumps(cmd), out, err) - for line in out.split('\n'): + result = ( + result + + '
Resolve host
Command
%s
Output
%s
Error
%s
' # noqa E501 + % (json.dumps(cmd), out, err) + ) + for line in out.split("\n"): # always capture the last IP in the nslookup response independently from the fact that it may be followed by a DNS name or not # Examples of matches: # Address 1: 10.0.12.28 -captured-> 10.0.12.28 # or # Address 1: 10.0.12.28 ip-10-0-12-28.eu-west-1.compute.internal -captured-> 10.0.12.28 - m = re.match('^Address.*\\s(\\d+\\.\\d+\\.\\d+\\.\\d+)', line) + m = re.match("^Address.*\\s(\\d+\\.\\d+\\.\\d+\\.\\d+)", line) if m is not None: ip = m.group(1) if ip is None: - raise Exception('Hostname resolution of DSS node failed: %s' % out) - - result = result + '
Host %s resolved to %s
' % (host, ip) + raise Exception("Hostname resolution of DSS node failed: %s" % out) + + result = result + "
Host %s resolved to %s
" % (host, ip) # try to connect on the backend port - cmd = ['nc', '-vz', ip, port, '-w', '5'] + cmd = ["nc", "-vz", ip, port, "-w", "5"] out, err = b.exec_cmd(cmd, timeout=10) - result = result + '
Test connection to port
Command
%s
Debug (stderr)
%s
' % (json.dumps(cmd), err) - if 'no route to host' in err.lower(): + result = ( + result + + '
Test connection to port
Command
%s
Debug (stderr)
%s
' # noqa E501 + % (json.dumps(cmd), err) + ) + if "no route to host" in err.lower(): raise Exception("DSS node resolved but unreachable on port %s : %s" % (str(port), err)) - result = result + '
Connection successful
' + result = result + "
Connection successful
" except KubeCommandException as e: traceback.print_exc() - result = result + '
%s
out:
%s
err:
%s
' % (str(e), e.out, e.err) + result = result + '
%s
out:
%s
err:
%s
' % ( + str(e), + e.out, + e.err, + ) except Exception as e: traceback.print_exc() result = result + '
%s
' % str(e) - - return '
%s
' % result + + return "
%s
" % result