diff --git a/.travis.yml b/.travis.yml index 6ad9a6d..fbf80e3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,8 @@ before_script: cd app script: - flake8 --exclude=\.eggs,tests,docs --ignore=E124,E303,W504 --max-line-length 80 . + - python3 ../tests/UnitTest.py + deploy: provider: pypi diff --git a/README.md b/README.md index b78f350..5fb0448 100644 --- a/README.md +++ b/README.md @@ -103,9 +103,33 @@ username=username,
password= hashed_password,
email=email,
status=OK
+}

+Upload/create credentials
+POST /users/username/credentials
+RequestBody:
+{ file : text or attached file
+file_name : name,
+encrypted: boolean,
+vault_pass: ansible vault password
}
- - +return : response with successful credential upload status
+
+List credentials
+GET /users/username/credentials/file_name
+return : response with encrypted credentials from file
+ +Delete credentials
+DELETE /users/username/credentials/file-name
+return : response with successful delete status
+
+Update credentials
+PUT /users/username/credentials/file_name
+RequestBody: { file : updated text or updated attached file
+encrypted: boolean,
+vault_pass: ansible vault password
+}
+return : response with successful credential update status
+
## Linchpin Project LinchPin is a simple cloud orchestration tool. Its intended purpose is managing cloud resources across multiple infrastructures. These resources can be provisioned, decommissioned, and configured all using declarative data and a simple command-line interface. diff --git a/app/ /44ad6c91-5191-43aa-8fa0-cd3fd2fc2e0d_dbtest/dummy/inventories/dummy_cluster.inventory2 b/app/ /44ad6c91-5191-43aa-8fa0-cd3fd2fc2e0d_dbtest/dummy/inventories/dummy_cluster.inventory2 deleted file mode 100644 index e69de29..0000000 diff --git a/app/__init__.py b/app/__init__.py index 8e4cdca..a6ecf81 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -7,13 +7,14 @@ import shutil import logging import subprocess +from ansible_vault import Vault from app.response_messages import response, errors from logging.handlers import RotatingFileHandler from flask import Flask, jsonify, request, Response, abort, make_response from werkzeug.security import generate_password_hash, check_password_hash from flask_swagger_ui import get_swaggerui_blueprint from functools import wraps -from app.utils import get_connection, create_fetch_cmd, create_cmd_workspace,\ +from app.utils import get_connection, create_fetch_cmd, create_cmd_workspace, \ create_cmd_up_pinfile, check_workspace_empty, get_connection_users, \ create_admin_user, check_workspace_has_pinfile @@ -22,13 +23,12 @@ APP_DIR = os.path.dirname(os.path.realpath(__file__)) try: - with open(APP_DIR + '/config.yml', 'r') as f: + with open(APP_DIR + 'config.yml', 'r') as f: config = yaml.load(f) except Exception as x: config = {} app.logger.error(x) - # loads defaults when config.yml does not exists or has been removed WORKSPACE_DIR = config.get('workspace_path', '/tmp') LOGGER_FILE = config.get('logger_file_name', 'restylinchpin.log') @@ -41,6 +41,7 @@ ADMIN_USERNAME = config.get('admin_username', 'admin') ADMIN_PASSWORD = config.get('admin_password', 'password') ADMIN_EMAIL = config.get('admin_email', 'email') +CREDS_PATH = config.get('creds_path', '/tmp') # URL for exposing Swagger UI (without trailing '/') SWAGGER_URL = '/api/docs' @@ -81,6 +82,7 @@ def decorated(*args, **kwargs): except Exception as e: return jsonify(message=response.API_KEY_INVALID, status=e) return function(current_user, *args, **kwargs) + return decorated @@ -101,7 +103,7 @@ def new_user(current_user): email = request.json.get('email') api_key = str(uuid.uuid4()) if username is None or password is None: - abort(errors.ERROR_STATUS) # missing arguments + abort(errors.ERROR_STATUS) # missing arguments if db_con.db_get_username(username): return jsonify(message=response.USER_ALREADY_EXISTS) hashed_password = generate_password_hash(password, method='sha256') @@ -355,8 +357,8 @@ def linchpin_init(current_user) -> Response: else: # Checking if workspace name contains any special characters output = subprocess.Popen(["linchpin", "-w " + - WORKSPACE_DIR + identity + - "/", "init"], stdout=subprocess.PIPE) + WORKSPACE_DIR + identity + + "/", "init"], stdout=subprocess.PIPE) db_con.db_update(identity, response.WORKSPACE_SUCCESS) return jsonify(name=data["name"], id=identity, status=response.CREATE_SUCCESS, @@ -431,7 +433,7 @@ def linchpin_delete_workspace(current_user, identity) -> Response: db_con = get_connection(DB_PATH) try: # path specifying location of working directory inside server - workspace_owner_user =\ + workspace_owner_user = \ db_con.db_search_username(current_user['username']) if not current_user['admin'] and not workspace_owner_user: return jsonify(message=response.NOT_FOUND) @@ -481,7 +483,8 @@ def linchpin_fetch_workspace(current_user) -> Response: else: output = subprocess.Popen(cmd, stdout=subprocess.PIPE) output.communicate() - if check_workspace_empty(identity, WORKSPACE_PATH): + if check_workspace_empty(identity, WORKSPACE_PATH, + WORKSPACE_DIR): db_con.db_update(identity, response.WORKSPACE_FAILED) return jsonify(status=response.EMPTY_WORKSPACE) @@ -500,9 +503,9 @@ def linchpin_fetch_workspace(current_user) -> Response: message=errors.KEY_ERROR_PARAMS_FETCH) -@app.route('/api/v1.0/workspaces/up', methods=['POST']) +@app.route('/api/v1.0/users//workspaces/up', methods=['POST']) @auth_required -def linchpin_up(current_user) -> Response: +def linchpin_up(current_user, username) -> Response: """ POST request route for provisioning workspaces/pinFile already created @@ -516,29 +519,53 @@ def linchpin_up(current_user) -> Response: """ identity = None db_con = get_connection(DB_PATH) + db_con_users = get_connection_users(DB_PATH) try: workspace = db_con.db_search_username(current_user['username']) + user = db_con_users.db_search_name(username) + if not current_user['username'] == username \ + and not current_user['admin']: + return jsonify(message=errors.UNAUTHORIZED_REQUEST) if not current_user['admin'] and not workspace: return jsonify(message=response.NOT_FOUND) data = request.json # Get request body provision_type = data['provision_type'] + creds_path = WORKSPACE_PATH + CREDS_PATH + user['creds_folder'] if provision_type == "workspace": identity = data['id'] if not current_user['admin']: workspace = db_con.db_search_identity(identity) + if not db_con.db_search(workspace['name'], current_user['admin'], current_user['username']): return jsonify(message=response.NOT_FOUND) if not os.path.exists(WORKSPACE_PATH + "/" + identity): return jsonify(status=response.NOT_FOUND) + if 'pinfile_path' in data: + pinfile_path = data['pinfile_path'] + check_path = identity + pinfile_path + else: + check_path = identity + if 'pinfile_name' in data: + pinfile_name = data['pinfile_name'] + else: + pinfile_name = "PinFile" + if not check_workspace_has_pinfile(check_path, pinfile_name, + WORKSPACE_PATH): + return jsonify(status=response.PINFILE_NOT_FOUND) + cmd = create_cmd_workspace(data, identity, "up", - WORKSPACE_PATH, WORKSPACE_DIR) + WORKSPACE_PATH, WORKSPACE_DIR, + creds_path) elif provision_type == "pinfile": if 'name' in data: identity = str(uuid.uuid4()) + "_" + data['name'] else: identity = str(uuid.uuid4()) + pinfile_content = data['pinfile_content'] + json_pinfile_path = \ + WORKSPACE_PATH + "/" + identity + PINFILE_JSON_PATH precmd = ["linchpin", "-w " + WORKSPACE_DIR + identity + "/", "init"] output = subprocess.Popen(precmd, stdout=subprocess.PIPE) @@ -546,8 +573,11 @@ def linchpin_up(current_user) -> Response: db_con.db_insert_no_name(identity, response.WORKSPACE_REQUESTED, current_user['username']) - cmd = create_cmd_up_pinfile(data, identity, WORKSPACE_PATH, - WORKSPACE_DIR, PINFILE_JSON_PATH) + with open(json_pinfile_path, 'w') as json_data: + json.dump(pinfile_content, json_data) + cmd = create_cmd_up_pinfile(data, identity, + WORKSPACE_DIR, + creds_path) else: raise ValueError output = subprocess.Popen(cmd, stdout=subprocess.PIPE) @@ -576,9 +606,9 @@ def linchpin_up(current_user) -> Response: return jsonify(status=errors.ERROR_STATUS, message=str(e)) -@app.route('/api/v1.0/workspaces/destroy', methods=['POST']) +@app.route('/api/v1.0/users//workspaces/destroy', methods=['POST']) @auth_required -def linchpin_destroy(current_user) -> Response: +def linchpin_destroy(current_user, username) -> Response: """ POST request route for destroying workspaces/resources already created or provisioned @@ -587,19 +617,25 @@ def linchpin_destroy(current_user) -> Response: """ identity = None db_con = get_connection(DB_PATH) + db_con_users = get_connection_users(DB_PATH) try: workspace = db_con.db_search_username(current_user['username']) + user = db_con_users.db_search_name(username) + if not current_user['username'] == username \ + and not current_user['admin']: + return jsonify(message=errors.UNAUTHORIZED_REQUEST) if not current_user['admin'] and not workspace: return jsonify(message=response.NOT_FOUND) data = request.json # Get request body identity = data['id'] + creds_path = WORKSPACE_PATH + CREDS_PATH + user['creds_folder'] if not current_user['admin']: workspace = db_con.db_search_identity(identity) if not db_con.db_search(workspace['name'], current_user['admin'], current_user['username']): return jsonify(message=response.NOT_FOUND) cmd = create_cmd_workspace(data, identity, "destroy", WORKSPACE_PATH, - WORKSPACE_DIR) + WORKSPACE_DIR, creds_path) output = subprocess.Popen(cmd, stdout=subprocess.PIPE) output.communicate() db_con.db_update(identity, response.DESTROY_STATUS_SUCCESS) @@ -688,7 +724,7 @@ def get_linchpin_latest(current_user, identity) -> Response: else: check_path = "/" linchpin_latest_directory = WORKSPACE_PATH + "/" + identity + check_path - if not os.listdir(linchpin_latest_directory).\ + if not os.listdir(linchpin_latest_directory). \ __contains__(LINCHPIN_LATEST_NAME): return jsonify(message=response.LINCHPIN_LATEST_NOT_FOUND) linchpin_latest_path = linchpin_latest_directory + LINCHPIN_LATEST_NAME @@ -720,7 +756,8 @@ def get_linchpin_inventory(current_user, identity) -> Response: return jsonify(message=response.NOT_FOUND) if not current_user['admin']: workspace = db_con.db_search_identity(identity) - if not db_con.db_search(workspace['name'], current_user['admin'], + if not db_con.db_search(workspace['name'], + current_user['admin'], current_user['username']): return jsonify(message=response.NOT_FOUND) data = request.json @@ -746,6 +783,170 @@ def get_linchpin_inventory(current_user, identity) -> Response: return jsonify(status=errors.ERROR_STATUS, message=str(e)) +@app.route('/api/v1.0/users//credentials', methods=['POST']) +@auth_required +def upload_credentials(current_user, username) -> Response: + """ + POST request for uploading credentials to user account + RequestBody: { file : text or attached file + file_name : name, + encrypted: boolean, + vault_pass: ansible vault password + } + return : response with successful credential upload status + """ + db_con = get_connection_users(DB_PATH) + try: + user = db_con.db_search_name(username) + if not current_user['username'] == username \ + and not current_user['admin']: + return jsonify(message=errors.UNAUTHORIZED_REQUEST) + file_name = request.form['file_name'] + encrypted = request.form['encrypted'] + if 'creds_folder_name' in request.form: + creds_folder = request.form["creds_folder_name"] + else: + if user['creds_folder'] is None: + creds_folder = username + "_" + str(uuid.uuid4()) + db_con.db_update_creds_folder(username, + creds_folder) + os.makedirs(WORKSPACE_PATH + CREDS_PATH + creds_folder) + else: + creds_folder = user['creds_folder'] + if request.files: + file = request.files["file"] + file_read = file.read() + write = 'wb' + else: + file_read = request.form["file"] + write = 'w' + if encrypted.lower() in ("true", "t"): + with open(WORKSPACE_PATH + CREDS_PATH + creds_folder + + "/" + file_name + ".yml", write) as yaml_file: + yaml_file.write(file_read) + else: + vault_pass = request.form['vault_pass'] + vault = Vault(vault_pass) + vault.dump(file_read, open(WORKSPACE_PATH + CREDS_PATH + + creds_folder + "/" + file_name + + ".yml", 'wb')) + return jsonify(message=response.CREDENTIALS_UPLOADED) + except (KeyError, ValueError, TypeError): + return jsonify(status=errors.ERROR_STATUS, + message=errors.KEY_ERROR) + except Exception as e: + app.logger.error(e) + return jsonify(status=errors.ERROR_STATUS, message=str(e)) + + +@app.route('/api/v1.0/users//credentials/', + methods=['GET']) +@auth_required +def get_credentials(current_user, file_name, username) -> Response: + """ + GET request to retrieve credentials from a credential file + return : response with encrypted credentials from file + """ + db_con = get_connection_users(DB_PATH) + try: + user = db_con.db_search_name(username) + if not current_user['username'] == username \ + and not current_user['admin']: + return jsonify(message=errors.UNAUTHORIZED_REQUEST) + if not os.listdir(WORKSPACE_PATH + CREDS_PATH + user['creds_folder']). \ + __contains__(file_name): + return jsonify(message=response.CREDENTIALS_FILE_NOT_FOUND) + with open(WORKSPACE_PATH + CREDS_PATH + user['creds_folder'] + + "/" + file_name, 'r') as data: + credentials = data.read().replace('\n', ' ') + return jsonify(credentials=credentials) + except (KeyError, ValueError, TypeError): + return jsonify(status=errors.ERROR_STATUS, + message=errors.KEY_ERROR) + except Exception as e: + app.logger.error(e) + return jsonify(status=errors.ERROR_STATUS, message=str(e)) + + +@app.route('/api/v1.0/users//credentials/', + methods=['PUT']) +@auth_required +def update_credentials(current_user, username, file_name) -> Response: + """ + PUT request for updating credentials to user account + RequestBody: { file : updated text or updated attached file + encrypted: boolean, + vault_pass: ansible vault password + } + return : response with successful credential update status + """ + db_con = get_connection_users(DB_PATH) + try: + user = db_con.db_search_name(username) + if not current_user['username'] == username \ + and not current_user['admin']: + return jsonify(message=errors.UNAUTHORIZED_REQUEST) + if 'creds_folder_name' not in request.form: + creds_folder = user['creds_folder'] + else: + creds_folder = request.form['creds_folder_name'] + if not os.listdir(WORKSPACE_PATH + CREDS_PATH + creds_folder). \ + __contains__(file_name): + return jsonify(message=response.CREDENTIALS_FILE_NOT_FOUND) + if request.files: + file = request.files["file"] + file_read = file.read() + write = 'wb' + else: + file_read = request.form["file"] + write = 'w' + encrypted = request.form['encrypted'] + if encrypted.lower() in ("true", "t"): + with open(WORKSPACE_PATH + CREDS_PATH + creds_folder + + "/" + file_name, write) as yaml_file: + yaml_file.write(file_read) + else: + vault_pass = request.form['vault_pass'] + vault = Vault(vault_pass) + vault.dump(file_read, open(WORKSPACE_PATH + CREDS_PATH + + creds_folder + "/" + file_name, 'wb')) + return jsonify(message=response.CREDENTIALS_UPDATED) + except Exception as e: + app.logger.error(e) + return jsonify(status=errors.ERROR_STATUS, message=str(e)) + + +@app.route('/api/v1.0/users//credentials/', + methods=['DELETE']) +@auth_required +def delete_credentials(current_user, username, file_name) -> Response: + """ + DELETE request to delete a credential file + return : response with successful delete status + """ + db_con = get_connection_users(DB_PATH) + try: + user = db_con.db_search_name(username) + if not current_user['username'] == username \ + and not current_user['admin']: + return jsonify(message=errors.UNAUTHORIZED_REQUEST) + if 'creds_folder_name' not in request.form: + creds_folder = user['creds_folder'] + else: + creds_folder = request.form['creds_folder_name'] + for w in os.listdir(WORKSPACE_PATH + CREDS_PATH + + creds_folder): + if w == file_name: + os.remove(WORKSPACE_PATH + CREDS_PATH + + creds_folder + "/" + w) + return jsonify(status=response.CREDENTIALS_DELETED, + mimetype='application/json') + return jsonify(status=response.CREDENTIALS_FILE_NOT_FOUND) + except Exception as e: + app.logger.error(e) + return jsonify(status=errors.ERROR_STATUS, message=str(e)) + + if __name__ == "__main__": create_admin_user(DB_PATH, ADMIN_USERNAME, ADMIN_PASSWORD, ADMIN_EMAIL) diff --git a/app/config.yml b/app/config.yml index 16354a9..2a236dc 100644 --- a/app/config.yml +++ b/app/config.yml @@ -11,9 +11,11 @@ inventory_path: /dummy/inventories/* linchpin_latest_file_path: /dummy/resources/linchpin.latest # path to Pinfile for dummy workspace pinfile_json_path: /dummy/PinFile.json -#details for admin user login #file name for linchpin.latest file in resources linchpin_latest_name: linchpin.latest +#details for admin user login admin_username: admin admin_password: password -admin_email: admin@xyz \ No newline at end of file +admin_email: admin@xyz +# path to creds folder +creds_path: /tmp \ No newline at end of file diff --git a/app/data_access_layer/UserBaseDB.py b/app/data_access_layer/UserBaseDB.py index 0216bce..e2a86d1 100644 --- a/app/data_access_layer/UserBaseDB.py +++ b/app/data_access_layer/UserBaseDB.py @@ -42,3 +42,7 @@ def db_reset_api_key(self, username, new_api_key): @abstractmethod def db_update(self, username, updated_username, password_hash, email): pass + + @abstractmethod + def db_update_creds_folder(self, username, creds_folder): + pass diff --git a/app/data_access_layer/UserRestDB.py b/app/data_access_layer/UserRestDB.py index ba8300c..64a9821 100644 --- a/app/data_access_layer/UserRestDB.py +++ b/app/data_access_layer/UserRestDB.py @@ -24,9 +24,11 @@ def db_insert(self, username, password_hash, api_key_hash, admin user :param email: User email """ + creds_folder = None self.table.insert({'username': username, 'password': password_hash, 'api_key': api_key_hash, - 'email': email, 'admin': admin}) + 'email': email, 'admin': admin, + 'creds_folder': creds_folder}) def db_search_name(self, username) -> List[Dict]: """ @@ -36,7 +38,7 @@ def db_search_name(self, username) -> List[Dict]: param username """ user = Query() - return self.table.search(user.username == username) + return self.table.search(user.username == username)[0] def db_list_all(self) -> List[Dict]: """ @@ -113,3 +115,8 @@ def db_update(self, username, updated_username, password_hash, self.table.update({'username': updated_username, 'password': password_hash, 'email': email}, user.username == username) + + def db_update_creds_folder(self, username, creds_folder): + user = Query() + self.table.update({'creds_folder': creds_folder}, + user.username == username) diff --git a/app/response_messages/response.py b/app/response_messages/response.py index 3d8ecb8..7da667e 100644 --- a/app/response_messages/response.py +++ b/app/response_messages/response.py @@ -3,6 +3,7 @@ PROVISION_SUCCESS = "Workspace provisioned successfully" DESTROY_SUCCESS = "Workspace/resources destroyed successfully" NOT_FOUND = "Workspace does not exist" +NOT_FOUND_USER = "User account does not exist" DUPLICATE_WORKSPACE = "Workspace with same name found," \ "try again by renaming" EMPTY_WORKSPACE = "Only public repositories can be "\ @@ -19,6 +20,7 @@ LINCHPIN_LATEST_NOT_FOUND = "linchpin.latest not found. " \ "Please check that it exists"\ " or specify linchpin_latest_path in request" +CREDENTIALS_FILE_NOT_FOUND = "No credential file found with this name" DESTROY_STATUS_SUCCESS = "Destroyed" DESTROY_FAILED = "FAILED destroy" STATUS_OK = "200 OK" @@ -36,3 +38,6 @@ "the format route?api_key=value" MISSING_USERNAME = "please provide the username in request route in "\ "the format route?username=value" +CREDENTIALS_UPLOADED = "Credentials uploaded successfully" +CREDENTIALS_UPDATED = "Credentials updated sccessfully" +CREDENTIALS_DELETED = "Credentials deleted successfully" diff --git a/app/utils/__init__.py b/app/utils/__init__.py index 01ad95b..173ec59 100644 --- a/app/utils/__init__.py +++ b/app/utils/__init__.py @@ -1,10 +1,7 @@ import os -import json import uuid from app.data_access_layer import RestDB from app.data_access_layer import UserRestDB -from app.response_messages import response -from flask import jsonify from typing import List from werkzeug.security import generate_password_hash @@ -74,13 +71,15 @@ def create_fetch_cmd(data, identity, workspace_dir) -> List[str]: def create_cmd_workspace(data, identity, action, - workspace_path, workspace_dir) -> List[str]: + workspace_path, workspace_dir, + creds_folder_path) -> List[str]: """ Creates a list to feed the subprocess for provisioning/ destroying existing workspaces :param data: JSON data from POST requestBody :param identity: unique uuid_name assigned to the workspace :param action: up or destroy action + :param creds_folder_path: path to the credentials folder :return a list for the subprocess to run """ if 'pinfile_path' in data: @@ -89,14 +88,12 @@ def create_cmd_workspace(data, identity, action, else: check_path = identity cmd = ["linchpin", "-w " + workspace_dir + check_path] + if 'creds_path' in data: + cmd.extend(("--creds-path", data['creds_path'])) + else: + cmd.extend(("--creds-path", creds_folder_path)) if 'pinfile_name' in data: cmd.extend(("-p", data['pinfile_name'])) - pinfile_name = data['pinfile_name'] - else: - pinfile_name = "PinFile" - if not check_workspace_has_pinfile(check_path, pinfile_name, - workspace_path): - return jsonify(status=response.PINFILE_NOT_FOUND) cmd.append(action) if 'tx_id' in data: cmd.extend(("-t", data['tx_id'])) @@ -109,22 +106,23 @@ def create_cmd_workspace(data, identity, action, def create_cmd_up_pinfile(data, identity, - workspace_path, workspace_dir, - pinfile_json_path) -> List[str]: + creds_folder_path) -> List[str]: """ Creates a list to feed the subprocess for provisioning new workspaces instantiated using a pinfile :param data: JSON data from POST requestBody :param identity: unique uuid_name assigned to the workspace + :param creds_folder_path: path to the credentials folder :return a list for the subprocess to run """ - pinfile_content = data['pinfile_content'] - json_pinfile_path = workspace_path + "/" + identity + pinfile_json_path - with open(json_pinfile_path, 'w') as json_data: - json.dump(pinfile_content, json_data) cmd = ["linchpin", "-w " + workspace_dir + identity + "/dummy", "-p" + - "PinFile.json", "up"] + "PinFile.json"] + if 'creds_path' in data: + cmd.extend(("--creds-path", data['creds_path'])) + else: + cmd.extend(("--creds-path", creds_folder_path)) + cmd.append("up") if 'inventory_format' in data: cmd.extend(("--if", data['inventory_format'])) return cmd @@ -140,7 +138,7 @@ def check_workspace_has_pinfile(name, pinfile_name, workspace_path) -> bool: return os.listdir(workspace_path + "/" + name).__contains__(pinfile_name) -def check_workspace_empty(name, workspace_path) -> bool: +def check_workspace_empty(name, workspace_path, working_dir) -> bool: """ Verifies if a workspace fetched/created is empty :param name: name of the workspace to be verified diff --git a/requirements.txt b/requirements.txt index 6595851..74c1e19 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ Flask>=1.0.3 linchpin<=1.7.5 flask-swagger-ui>=3.20.9 flake8>=3.7.7 + ansible-vault>=1.2.0 \ No newline at end of file diff --git a/tests/UnitTest.py b/tests/UnitTest.py new file mode 100644 index 0000000..d4bdefb --- /dev/null +++ b/tests/UnitTest.py @@ -0,0 +1,55 @@ +import unittest +import app +from app.data_access_layer import RestDB +from app.data_access_layer import UserRestDB + +BASE_URL = 'http://localhost:5000/api/v1.0' + + +class UnitTest(unittest.TestCase): + + def setUp(self): + self.app = app.app.test_client() + self.app.testing = True + + def test_db_create_admin_user(self): + app.create_admin_user(app.DB_PATH, app.ADMIN_USERNAME, + app.ADMIN_PASSWORD, app.ADMIN_EMAIL) + self.assertEqual("admin", app.get_connection_users(app.DB_PATH).db_search_name("admin")['username']) + + def test_get_connection(self): + self.assertIsInstance(app.get_connection(app.DB_PATH), RestDB.RestDB) + + def test_get_connection_users(self): + self.assertIsInstance(app.get_connection_users(app.DB_PATH), UserRestDB.UserRestDB) + + def test_create_fetch_cmd(self): + data = {"name": "test", "url": "www.github.com/CentOS-PaaS-SIG/linchpin", + "rootfolder":"/"} + identity = "test123" + self.assertEqual(app.create_fetch_cmd(data, identity, app.WORKSPACE_DIR), + ["linchpin", "-w " + app.WORKSPACE_DIR + identity, "fetch", "--root", data['rootfolder'], + data['url']]) + + def test_create_cmd_workspace(self): + data = {"id": "test123", + "provision_type": "workspace", + "pinfile_path": "/dummy/" + } + action = "up" + self.assertEqual(app.create_cmd_workspace(data, data['id'], action, + app.WORKSPACE_PATH, app.WORKSPACE_DIR, + app.CREDS_PATH), ["linchpin", "-w " + app.WORKSPACE_DIR + data['id'] + data['pinfile_path'], + "--creds-path", "/", "up"]) + + def test_create_cmd_up_pinfile(self): + data = {"id": "UnitTest", + "provision_type": "pinfile", + "pinfile_content":{"test":"pinfile_data"}} + self.assertEqual(app.create_cmd_up_pinfile(data, data['id'], app.WORKSPACE_DIR, app.CREDS_PATH), + ["linchpin", "-w " + app.WORKSPACE_DIR + data['id'] + "/dummy", "-p" + + "PinFile.json", "--creds-path", "/", "up"]) + + +if __name__ == '__main__': + unittest.main()