diff --git a/.travis.yml b/.travis.yml
index 6ad9a6d..fbf80e3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -10,6 +10,8 @@ before_script: cd app
script:
- flake8 --exclude=\.eggs,tests,docs --ignore=E124,E303,W504 --max-line-length 80 .
+ - python3 ../tests/UnitTest.py
+
deploy:
provider: pypi
diff --git a/README.md b/README.md
index b78f350..5fb0448 100644
--- a/README.md
+++ b/README.md
@@ -103,9 +103,33 @@ username=username,
password= hashed_password,
email=email,
status=OK
+}
+Upload/create credentials
+POST /users/username/credentials
+RequestBody:
+{ file : text or attached file
+file_name : name,
+encrypted: boolean,
+vault_pass: ansible vault password
}
-
-
+return : response with successful credential upload status
+
+List credentials
+GET /users/username/credentials/file_name
+return : response with encrypted credentials from file
+
+Delete credentials
+DELETE /users/username/credentials/file-name
+return : response with successful delete status
+
+Update credentials
+PUT /users/username/credentials/file_name
+RequestBody: { file : updated text or updated attached file
+encrypted: boolean,
+vault_pass: ansible vault password
+}
+return : response with successful credential update status
+
## Linchpin Project
LinchPin is a simple cloud orchestration tool. Its intended purpose is managing cloud resources across multiple infrastructures. These resources can be provisioned, decommissioned, and configured all using declarative data and a simple command-line interface.
diff --git a/app/ /44ad6c91-5191-43aa-8fa0-cd3fd2fc2e0d_dbtest/dummy/inventories/dummy_cluster.inventory2 b/app/ /44ad6c91-5191-43aa-8fa0-cd3fd2fc2e0d_dbtest/dummy/inventories/dummy_cluster.inventory2
deleted file mode 100644
index e69de29..0000000
diff --git a/app/__init__.py b/app/__init__.py
index 8e4cdca..a6ecf81 100644
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -7,13 +7,14 @@
import shutil
import logging
import subprocess
+from ansible_vault import Vault
from app.response_messages import response, errors
from logging.handlers import RotatingFileHandler
from flask import Flask, jsonify, request, Response, abort, make_response
from werkzeug.security import generate_password_hash, check_password_hash
from flask_swagger_ui import get_swaggerui_blueprint
from functools import wraps
-from app.utils import get_connection, create_fetch_cmd, create_cmd_workspace,\
+from app.utils import get_connection, create_fetch_cmd, create_cmd_workspace, \
create_cmd_up_pinfile, check_workspace_empty, get_connection_users, \
create_admin_user, check_workspace_has_pinfile
@@ -22,13 +23,12 @@
APP_DIR = os.path.dirname(os.path.realpath(__file__))
try:
- with open(APP_DIR + '/config.yml', 'r') as f:
+ with open(APP_DIR + 'config.yml', 'r') as f:
config = yaml.load(f)
except Exception as x:
config = {}
app.logger.error(x)
-
# loads defaults when config.yml does not exists or has been removed
WORKSPACE_DIR = config.get('workspace_path', '/tmp')
LOGGER_FILE = config.get('logger_file_name', 'restylinchpin.log')
@@ -41,6 +41,7 @@
ADMIN_USERNAME = config.get('admin_username', 'admin')
ADMIN_PASSWORD = config.get('admin_password', 'password')
ADMIN_EMAIL = config.get('admin_email', 'email')
+CREDS_PATH = config.get('creds_path', '/tmp')
# URL for exposing Swagger UI (without trailing '/')
SWAGGER_URL = '/api/docs'
@@ -81,6 +82,7 @@ def decorated(*args, **kwargs):
except Exception as e:
return jsonify(message=response.API_KEY_INVALID, status=e)
return function(current_user, *args, **kwargs)
+
return decorated
@@ -101,7 +103,7 @@ def new_user(current_user):
email = request.json.get('email')
api_key = str(uuid.uuid4())
if username is None or password is None:
- abort(errors.ERROR_STATUS) # missing arguments
+ abort(errors.ERROR_STATUS) # missing arguments
if db_con.db_get_username(username):
return jsonify(message=response.USER_ALREADY_EXISTS)
hashed_password = generate_password_hash(password, method='sha256')
@@ -355,8 +357,8 @@ def linchpin_init(current_user) -> Response:
else:
# Checking if workspace name contains any special characters
output = subprocess.Popen(["linchpin", "-w " +
- WORKSPACE_DIR + identity +
- "/", "init"], stdout=subprocess.PIPE)
+ WORKSPACE_DIR + identity +
+ "/", "init"], stdout=subprocess.PIPE)
db_con.db_update(identity, response.WORKSPACE_SUCCESS)
return jsonify(name=data["name"], id=identity,
status=response.CREATE_SUCCESS,
@@ -431,7 +433,7 @@ def linchpin_delete_workspace(current_user, identity) -> Response:
db_con = get_connection(DB_PATH)
try:
# path specifying location of working directory inside server
- workspace_owner_user =\
+ workspace_owner_user = \
db_con.db_search_username(current_user['username'])
if not current_user['admin'] and not workspace_owner_user:
return jsonify(message=response.NOT_FOUND)
@@ -481,7 +483,8 @@ def linchpin_fetch_workspace(current_user) -> Response:
else:
output = subprocess.Popen(cmd, stdout=subprocess.PIPE)
output.communicate()
- if check_workspace_empty(identity, WORKSPACE_PATH):
+ if check_workspace_empty(identity, WORKSPACE_PATH,
+ WORKSPACE_DIR):
db_con.db_update(identity,
response.WORKSPACE_FAILED)
return jsonify(status=response.EMPTY_WORKSPACE)
@@ -500,9 +503,9 @@ def linchpin_fetch_workspace(current_user) -> Response:
message=errors.KEY_ERROR_PARAMS_FETCH)
-@app.route('/api/v1.0/workspaces/up', methods=['POST'])
+@app.route('/api/v1.0/users//workspaces/up', methods=['POST'])
@auth_required
-def linchpin_up(current_user) -> Response:
+def linchpin_up(current_user, username) -> Response:
"""
POST request route for provisioning workspaces/pinFile already
created
@@ -516,29 +519,53 @@ def linchpin_up(current_user) -> Response:
"""
identity = None
db_con = get_connection(DB_PATH)
+ db_con_users = get_connection_users(DB_PATH)
try:
workspace = db_con.db_search_username(current_user['username'])
+ user = db_con_users.db_search_name(username)
+ if not current_user['username'] == username \
+ and not current_user['admin']:
+ return jsonify(message=errors.UNAUTHORIZED_REQUEST)
if not current_user['admin'] and not workspace:
return jsonify(message=response.NOT_FOUND)
data = request.json # Get request body
provision_type = data['provision_type']
+ creds_path = WORKSPACE_PATH + CREDS_PATH + user['creds_folder']
if provision_type == "workspace":
identity = data['id']
if not current_user['admin']:
workspace = db_con.db_search_identity(identity)
+
if not db_con.db_search(workspace['name'],
current_user['admin'],
current_user['username']):
return jsonify(message=response.NOT_FOUND)
if not os.path.exists(WORKSPACE_PATH + "/" + identity):
return jsonify(status=response.NOT_FOUND)
+ if 'pinfile_path' in data:
+ pinfile_path = data['pinfile_path']
+ check_path = identity + pinfile_path
+ else:
+ check_path = identity
+ if 'pinfile_name' in data:
+ pinfile_name = data['pinfile_name']
+ else:
+ pinfile_name = "PinFile"
+ if not check_workspace_has_pinfile(check_path, pinfile_name,
+ WORKSPACE_PATH):
+ return jsonify(status=response.PINFILE_NOT_FOUND)
+
cmd = create_cmd_workspace(data, identity, "up",
- WORKSPACE_PATH, WORKSPACE_DIR)
+ WORKSPACE_PATH, WORKSPACE_DIR,
+ creds_path)
elif provision_type == "pinfile":
if 'name' in data:
identity = str(uuid.uuid4()) + "_" + data['name']
else:
identity = str(uuid.uuid4())
+ pinfile_content = data['pinfile_content']
+ json_pinfile_path = \
+ WORKSPACE_PATH + "/" + identity + PINFILE_JSON_PATH
precmd = ["linchpin", "-w " + WORKSPACE_DIR + identity +
"/", "init"]
output = subprocess.Popen(precmd, stdout=subprocess.PIPE)
@@ -546,8 +573,11 @@ def linchpin_up(current_user) -> Response:
db_con.db_insert_no_name(identity,
response.WORKSPACE_REQUESTED,
current_user['username'])
- cmd = create_cmd_up_pinfile(data, identity, WORKSPACE_PATH,
- WORKSPACE_DIR, PINFILE_JSON_PATH)
+ with open(json_pinfile_path, 'w') as json_data:
+ json.dump(pinfile_content, json_data)
+ cmd = create_cmd_up_pinfile(data, identity,
+ WORKSPACE_DIR,
+ creds_path)
else:
raise ValueError
output = subprocess.Popen(cmd, stdout=subprocess.PIPE)
@@ -576,9 +606,9 @@ def linchpin_up(current_user) -> Response:
return jsonify(status=errors.ERROR_STATUS, message=str(e))
-@app.route('/api/v1.0/workspaces/destroy', methods=['POST'])
+@app.route('/api/v1.0/users//workspaces/destroy', methods=['POST'])
@auth_required
-def linchpin_destroy(current_user) -> Response:
+def linchpin_destroy(current_user, username) -> Response:
"""
POST request route for destroying workspaces/resources already created
or provisioned
@@ -587,19 +617,25 @@ def linchpin_destroy(current_user) -> Response:
"""
identity = None
db_con = get_connection(DB_PATH)
+ db_con_users = get_connection_users(DB_PATH)
try:
workspace = db_con.db_search_username(current_user['username'])
+ user = db_con_users.db_search_name(username)
+ if not current_user['username'] == username \
+ and not current_user['admin']:
+ return jsonify(message=errors.UNAUTHORIZED_REQUEST)
if not current_user['admin'] and not workspace:
return jsonify(message=response.NOT_FOUND)
data = request.json # Get request body
identity = data['id']
+ creds_path = WORKSPACE_PATH + CREDS_PATH + user['creds_folder']
if not current_user['admin']:
workspace = db_con.db_search_identity(identity)
if not db_con.db_search(workspace['name'], current_user['admin'],
current_user['username']):
return jsonify(message=response.NOT_FOUND)
cmd = create_cmd_workspace(data, identity, "destroy", WORKSPACE_PATH,
- WORKSPACE_DIR)
+ WORKSPACE_DIR, creds_path)
output = subprocess.Popen(cmd, stdout=subprocess.PIPE)
output.communicate()
db_con.db_update(identity, response.DESTROY_STATUS_SUCCESS)
@@ -688,7 +724,7 @@ def get_linchpin_latest(current_user, identity) -> Response:
else:
check_path = "/"
linchpin_latest_directory = WORKSPACE_PATH + "/" + identity + check_path
- if not os.listdir(linchpin_latest_directory).\
+ if not os.listdir(linchpin_latest_directory). \
__contains__(LINCHPIN_LATEST_NAME):
return jsonify(message=response.LINCHPIN_LATEST_NOT_FOUND)
linchpin_latest_path = linchpin_latest_directory + LINCHPIN_LATEST_NAME
@@ -720,7 +756,8 @@ def get_linchpin_inventory(current_user, identity) -> Response:
return jsonify(message=response.NOT_FOUND)
if not current_user['admin']:
workspace = db_con.db_search_identity(identity)
- if not db_con.db_search(workspace['name'], current_user['admin'],
+ if not db_con.db_search(workspace['name'],
+ current_user['admin'],
current_user['username']):
return jsonify(message=response.NOT_FOUND)
data = request.json
@@ -746,6 +783,170 @@ def get_linchpin_inventory(current_user, identity) -> Response:
return jsonify(status=errors.ERROR_STATUS, message=str(e))
+@app.route('/api/v1.0/users//credentials', methods=['POST'])
+@auth_required
+def upload_credentials(current_user, username) -> Response:
+ """
+ POST request for uploading credentials to user account
+ RequestBody: { file : text or attached file
+ file_name : name,
+ encrypted: boolean,
+ vault_pass: ansible vault password
+ }
+ return : response with successful credential upload status
+ """
+ db_con = get_connection_users(DB_PATH)
+ try:
+ user = db_con.db_search_name(username)
+ if not current_user['username'] == username \
+ and not current_user['admin']:
+ return jsonify(message=errors.UNAUTHORIZED_REQUEST)
+ file_name = request.form['file_name']
+ encrypted = request.form['encrypted']
+ if 'creds_folder_name' in request.form:
+ creds_folder = request.form["creds_folder_name"]
+ else:
+ if user['creds_folder'] is None:
+ creds_folder = username + "_" + str(uuid.uuid4())
+ db_con.db_update_creds_folder(username,
+ creds_folder)
+ os.makedirs(WORKSPACE_PATH + CREDS_PATH + creds_folder)
+ else:
+ creds_folder = user['creds_folder']
+ if request.files:
+ file = request.files["file"]
+ file_read = file.read()
+ write = 'wb'
+ else:
+ file_read = request.form["file"]
+ write = 'w'
+ if encrypted.lower() in ("true", "t"):
+ with open(WORKSPACE_PATH + CREDS_PATH + creds_folder +
+ "/" + file_name + ".yml", write) as yaml_file:
+ yaml_file.write(file_read)
+ else:
+ vault_pass = request.form['vault_pass']
+ vault = Vault(vault_pass)
+ vault.dump(file_read, open(WORKSPACE_PATH + CREDS_PATH +
+ creds_folder + "/" + file_name +
+ ".yml", 'wb'))
+ return jsonify(message=response.CREDENTIALS_UPLOADED)
+ except (KeyError, ValueError, TypeError):
+ return jsonify(status=errors.ERROR_STATUS,
+ message=errors.KEY_ERROR)
+ except Exception as e:
+ app.logger.error(e)
+ return jsonify(status=errors.ERROR_STATUS, message=str(e))
+
+
+@app.route('/api/v1.0/users//credentials/',
+ methods=['GET'])
+@auth_required
+def get_credentials(current_user, file_name, username) -> Response:
+ """
+ GET request to retrieve credentials from a credential file
+ return : response with encrypted credentials from file
+ """
+ db_con = get_connection_users(DB_PATH)
+ try:
+ user = db_con.db_search_name(username)
+ if not current_user['username'] == username \
+ and not current_user['admin']:
+ return jsonify(message=errors.UNAUTHORIZED_REQUEST)
+ if not os.listdir(WORKSPACE_PATH + CREDS_PATH + user['creds_folder']). \
+ __contains__(file_name):
+ return jsonify(message=response.CREDENTIALS_FILE_NOT_FOUND)
+ with open(WORKSPACE_PATH + CREDS_PATH + user['creds_folder'] +
+ "/" + file_name, 'r') as data:
+ credentials = data.read().replace('\n', ' ')
+ return jsonify(credentials=credentials)
+ except (KeyError, ValueError, TypeError):
+ return jsonify(status=errors.ERROR_STATUS,
+ message=errors.KEY_ERROR)
+ except Exception as e:
+ app.logger.error(e)
+ return jsonify(status=errors.ERROR_STATUS, message=str(e))
+
+
+@app.route('/api/v1.0/users//credentials/',
+ methods=['PUT'])
+@auth_required
+def update_credentials(current_user, username, file_name) -> Response:
+ """
+ PUT request for updating credentials to user account
+ RequestBody: { file : updated text or updated attached file
+ encrypted: boolean,
+ vault_pass: ansible vault password
+ }
+ return : response with successful credential update status
+ """
+ db_con = get_connection_users(DB_PATH)
+ try:
+ user = db_con.db_search_name(username)
+ if not current_user['username'] == username \
+ and not current_user['admin']:
+ return jsonify(message=errors.UNAUTHORIZED_REQUEST)
+ if 'creds_folder_name' not in request.form:
+ creds_folder = user['creds_folder']
+ else:
+ creds_folder = request.form['creds_folder_name']
+ if not os.listdir(WORKSPACE_PATH + CREDS_PATH + creds_folder). \
+ __contains__(file_name):
+ return jsonify(message=response.CREDENTIALS_FILE_NOT_FOUND)
+ if request.files:
+ file = request.files["file"]
+ file_read = file.read()
+ write = 'wb'
+ else:
+ file_read = request.form["file"]
+ write = 'w'
+ encrypted = request.form['encrypted']
+ if encrypted.lower() in ("true", "t"):
+ with open(WORKSPACE_PATH + CREDS_PATH + creds_folder +
+ "/" + file_name, write) as yaml_file:
+ yaml_file.write(file_read)
+ else:
+ vault_pass = request.form['vault_pass']
+ vault = Vault(vault_pass)
+ vault.dump(file_read, open(WORKSPACE_PATH + CREDS_PATH +
+ creds_folder + "/" + file_name, 'wb'))
+ return jsonify(message=response.CREDENTIALS_UPDATED)
+ except Exception as e:
+ app.logger.error(e)
+ return jsonify(status=errors.ERROR_STATUS, message=str(e))
+
+
+@app.route('/api/v1.0/users//credentials/',
+ methods=['DELETE'])
+@auth_required
+def delete_credentials(current_user, username, file_name) -> Response:
+ """
+ DELETE request to delete a credential file
+ return : response with successful delete status
+ """
+ db_con = get_connection_users(DB_PATH)
+ try:
+ user = db_con.db_search_name(username)
+ if not current_user['username'] == username \
+ and not current_user['admin']:
+ return jsonify(message=errors.UNAUTHORIZED_REQUEST)
+ if 'creds_folder_name' not in request.form:
+ creds_folder = user['creds_folder']
+ else:
+ creds_folder = request.form['creds_folder_name']
+ for w in os.listdir(WORKSPACE_PATH + CREDS_PATH +
+ creds_folder):
+ if w == file_name:
+ os.remove(WORKSPACE_PATH + CREDS_PATH +
+ creds_folder + "/" + w)
+ return jsonify(status=response.CREDENTIALS_DELETED,
+ mimetype='application/json')
+ return jsonify(status=response.CREDENTIALS_FILE_NOT_FOUND)
+ except Exception as e:
+ app.logger.error(e)
+ return jsonify(status=errors.ERROR_STATUS, message=str(e))
+
+
if __name__ == "__main__":
create_admin_user(DB_PATH, ADMIN_USERNAME,
ADMIN_PASSWORD, ADMIN_EMAIL)
diff --git a/app/config.yml b/app/config.yml
index 16354a9..2a236dc 100644
--- a/app/config.yml
+++ b/app/config.yml
@@ -11,9 +11,11 @@ inventory_path: /dummy/inventories/*
linchpin_latest_file_path: /dummy/resources/linchpin.latest
# path to Pinfile for dummy workspace
pinfile_json_path: /dummy/PinFile.json
-#details for admin user login
#file name for linchpin.latest file in resources
linchpin_latest_name: linchpin.latest
+#details for admin user login
admin_username: admin
admin_password: password
-admin_email: admin@xyz
\ No newline at end of file
+admin_email: admin@xyz
+# path to creds folder
+creds_path: /tmp
\ No newline at end of file
diff --git a/app/data_access_layer/UserBaseDB.py b/app/data_access_layer/UserBaseDB.py
index 0216bce..e2a86d1 100644
--- a/app/data_access_layer/UserBaseDB.py
+++ b/app/data_access_layer/UserBaseDB.py
@@ -42,3 +42,7 @@ def db_reset_api_key(self, username, new_api_key):
@abstractmethod
def db_update(self, username, updated_username, password_hash, email):
pass
+
+ @abstractmethod
+ def db_update_creds_folder(self, username, creds_folder):
+ pass
diff --git a/app/data_access_layer/UserRestDB.py b/app/data_access_layer/UserRestDB.py
index ba8300c..64a9821 100644
--- a/app/data_access_layer/UserRestDB.py
+++ b/app/data_access_layer/UserRestDB.py
@@ -24,9 +24,11 @@ def db_insert(self, username, password_hash, api_key_hash,
admin user
:param email: User email
"""
+ creds_folder = None
self.table.insert({'username': username, 'password': password_hash,
'api_key': api_key_hash,
- 'email': email, 'admin': admin})
+ 'email': email, 'admin': admin,
+ 'creds_folder': creds_folder})
def db_search_name(self, username) -> List[Dict]:
"""
@@ -36,7 +38,7 @@ def db_search_name(self, username) -> List[Dict]:
param username
"""
user = Query()
- return self.table.search(user.username == username)
+ return self.table.search(user.username == username)[0]
def db_list_all(self) -> List[Dict]:
"""
@@ -113,3 +115,8 @@ def db_update(self, username, updated_username, password_hash,
self.table.update({'username': updated_username,
'password': password_hash,
'email': email}, user.username == username)
+
+ def db_update_creds_folder(self, username, creds_folder):
+ user = Query()
+ self.table.update({'creds_folder': creds_folder},
+ user.username == username)
diff --git a/app/response_messages/response.py b/app/response_messages/response.py
index 3d8ecb8..7da667e 100644
--- a/app/response_messages/response.py
+++ b/app/response_messages/response.py
@@ -3,6 +3,7 @@
PROVISION_SUCCESS = "Workspace provisioned successfully"
DESTROY_SUCCESS = "Workspace/resources destroyed successfully"
NOT_FOUND = "Workspace does not exist"
+NOT_FOUND_USER = "User account does not exist"
DUPLICATE_WORKSPACE = "Workspace with same name found," \
"try again by renaming"
EMPTY_WORKSPACE = "Only public repositories can be "\
@@ -19,6 +20,7 @@
LINCHPIN_LATEST_NOT_FOUND = "linchpin.latest not found. " \
"Please check that it exists"\
" or specify linchpin_latest_path in request"
+CREDENTIALS_FILE_NOT_FOUND = "No credential file found with this name"
DESTROY_STATUS_SUCCESS = "Destroyed"
DESTROY_FAILED = "FAILED destroy"
STATUS_OK = "200 OK"
@@ -36,3 +38,6 @@
"the format route?api_key=value"
MISSING_USERNAME = "please provide the username in request route in "\
"the format route?username=value"
+CREDENTIALS_UPLOADED = "Credentials uploaded successfully"
+CREDENTIALS_UPDATED = "Credentials updated sccessfully"
+CREDENTIALS_DELETED = "Credentials deleted successfully"
diff --git a/app/utils/__init__.py b/app/utils/__init__.py
index 01ad95b..173ec59 100644
--- a/app/utils/__init__.py
+++ b/app/utils/__init__.py
@@ -1,10 +1,7 @@
import os
-import json
import uuid
from app.data_access_layer import RestDB
from app.data_access_layer import UserRestDB
-from app.response_messages import response
-from flask import jsonify
from typing import List
from werkzeug.security import generate_password_hash
@@ -74,13 +71,15 @@ def create_fetch_cmd(data, identity, workspace_dir) -> List[str]:
def create_cmd_workspace(data, identity, action,
- workspace_path, workspace_dir) -> List[str]:
+ workspace_path, workspace_dir,
+ creds_folder_path) -> List[str]:
"""
Creates a list to feed the subprocess for provisioning/
destroying existing workspaces
:param data: JSON data from POST requestBody
:param identity: unique uuid_name assigned to the workspace
:param action: up or destroy action
+ :param creds_folder_path: path to the credentials folder
:return a list for the subprocess to run
"""
if 'pinfile_path' in data:
@@ -89,14 +88,12 @@ def create_cmd_workspace(data, identity, action,
else:
check_path = identity
cmd = ["linchpin", "-w " + workspace_dir + check_path]
+ if 'creds_path' in data:
+ cmd.extend(("--creds-path", data['creds_path']))
+ else:
+ cmd.extend(("--creds-path", creds_folder_path))
if 'pinfile_name' in data:
cmd.extend(("-p", data['pinfile_name']))
- pinfile_name = data['pinfile_name']
- else:
- pinfile_name = "PinFile"
- if not check_workspace_has_pinfile(check_path, pinfile_name,
- workspace_path):
- return jsonify(status=response.PINFILE_NOT_FOUND)
cmd.append(action)
if 'tx_id' in data:
cmd.extend(("-t", data['tx_id']))
@@ -109,22 +106,23 @@ def create_cmd_workspace(data, identity, action,
def create_cmd_up_pinfile(data,
identity,
- workspace_path,
workspace_dir,
- pinfile_json_path) -> List[str]:
+ creds_folder_path) -> List[str]:
"""
Creates a list to feed the subprocess for provisioning
new workspaces instantiated using a pinfile
:param data: JSON data from POST requestBody
:param identity: unique uuid_name assigned to the workspace
+ :param creds_folder_path: path to the credentials folder
:return a list for the subprocess to run
"""
- pinfile_content = data['pinfile_content']
- json_pinfile_path = workspace_path + "/" + identity + pinfile_json_path
- with open(json_pinfile_path, 'w') as json_data:
- json.dump(pinfile_content, json_data)
cmd = ["linchpin", "-w " + workspace_dir + identity + "/dummy", "-p" +
- "PinFile.json", "up"]
+ "PinFile.json"]
+ if 'creds_path' in data:
+ cmd.extend(("--creds-path", data['creds_path']))
+ else:
+ cmd.extend(("--creds-path", creds_folder_path))
+ cmd.append("up")
if 'inventory_format' in data:
cmd.extend(("--if", data['inventory_format']))
return cmd
@@ -140,7 +138,7 @@ def check_workspace_has_pinfile(name, pinfile_name, workspace_path) -> bool:
return os.listdir(workspace_path + "/" + name).__contains__(pinfile_name)
-def check_workspace_empty(name, workspace_path) -> bool:
+def check_workspace_empty(name, workspace_path, working_dir) -> bool:
"""
Verifies if a workspace fetched/created is empty
:param name: name of the workspace to be verified
diff --git a/requirements.txt b/requirements.txt
index 6595851..74c1e19 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,3 +2,4 @@ Flask>=1.0.3
linchpin<=1.7.5
flask-swagger-ui>=3.20.9
flake8>=3.7.7
+ ansible-vault>=1.2.0
\ No newline at end of file
diff --git a/tests/UnitTest.py b/tests/UnitTest.py
new file mode 100644
index 0000000..d4bdefb
--- /dev/null
+++ b/tests/UnitTest.py
@@ -0,0 +1,55 @@
+import unittest
+import app
+from app.data_access_layer import RestDB
+from app.data_access_layer import UserRestDB
+
+BASE_URL = 'http://localhost:5000/api/v1.0'
+
+
+class UnitTest(unittest.TestCase):
+
+ def setUp(self):
+ self.app = app.app.test_client()
+ self.app.testing = True
+
+ def test_db_create_admin_user(self):
+ app.create_admin_user(app.DB_PATH, app.ADMIN_USERNAME,
+ app.ADMIN_PASSWORD, app.ADMIN_EMAIL)
+ self.assertEqual("admin", app.get_connection_users(app.DB_PATH).db_search_name("admin")['username'])
+
+ def test_get_connection(self):
+ self.assertIsInstance(app.get_connection(app.DB_PATH), RestDB.RestDB)
+
+ def test_get_connection_users(self):
+ self.assertIsInstance(app.get_connection_users(app.DB_PATH), UserRestDB.UserRestDB)
+
+ def test_create_fetch_cmd(self):
+ data = {"name": "test", "url": "www.github.com/CentOS-PaaS-SIG/linchpin",
+ "rootfolder":"/"}
+ identity = "test123"
+ self.assertEqual(app.create_fetch_cmd(data, identity, app.WORKSPACE_DIR),
+ ["linchpin", "-w " + app.WORKSPACE_DIR + identity, "fetch", "--root", data['rootfolder'],
+ data['url']])
+
+ def test_create_cmd_workspace(self):
+ data = {"id": "test123",
+ "provision_type": "workspace",
+ "pinfile_path": "/dummy/"
+ }
+ action = "up"
+ self.assertEqual(app.create_cmd_workspace(data, data['id'], action,
+ app.WORKSPACE_PATH, app.WORKSPACE_DIR,
+ app.CREDS_PATH), ["linchpin", "-w " + app.WORKSPACE_DIR + data['id'] + data['pinfile_path'],
+ "--creds-path", "/", "up"])
+
+ def test_create_cmd_up_pinfile(self):
+ data = {"id": "UnitTest",
+ "provision_type": "pinfile",
+ "pinfile_content":{"test":"pinfile_data"}}
+ self.assertEqual(app.create_cmd_up_pinfile(data, data['id'], app.WORKSPACE_DIR, app.CREDS_PATH),
+ ["linchpin", "-w " + app.WORKSPACE_DIR + data['id'] + "/dummy", "-p" +
+ "PinFile.json", "--creds-path", "/", "up"])
+
+
+if __name__ == '__main__':
+ unittest.main()