diff --git a/backend/router/computing_provider.py b/backend/router/computing_provider.py index 6241f994..336610d1 100644 --- a/backend/router/computing_provider.py +++ b/backend/router/computing_provider.py @@ -46,6 +46,7 @@ def get_provider_info(): response_object.message = 'Provider not found' return jsonify(response_object.to_dict()), 200 + # connect the cp by sending a get request @cp_bp.route('/cp/connect', methods=['GET']) @jwt_or_api_token_required @@ -58,15 +59,15 @@ def get_provider_connection(): if not node_id: response.message = "node_id can not be empty." return jsonify(response.to_dict()), http.client.BAD_REQUEST - + provider = ComputingProvider.get_provider_by_node_id(node_id) if not provider or provider.user_id != user.id: response.message = "Provider is NOT found with the given node_id." return jsonify(response.to_dict()), http.client.NOT_FOUND - + try: url = multiaddress_to_url(provider.multi_address, "api/v1/computing/host/info") - res = requests.get(url, timeout=15, verify=False) #15s timeout + res = requests.get(url, timeout=15, verify=False) # 15s timeout if res.status_code == 200: response.message = "successfully get response 200 from provider." response.status = constant.STATUS_SUCCESS @@ -79,6 +80,7 @@ def get_provider_connection(): logging.info(f"Error sending request to provider with node_id: {node_id}. Error {str(e)}]") return jsonify(response.to_dict()), http.client.BAD_REQUEST + # register new cp @cp_bp.route('/cp', methods=['POST']) @api_token_required @@ -122,6 +124,7 @@ def create_provider_info(): response.message = "CP with node_id already created." return jsonify(response.to_dict()), http.client.OK + # update existing cp @cp_bp.route("/cp/update", methods=["POST"]) @api_token_required @@ -173,6 +176,7 @@ def update_provider_info(): return jsonify(response.to_dict()), http.client.OK + @cp_bp.route('/cp/summary', methods=['POST']) @jwt_or_api_token_required def update_provider_resources(): @@ -300,7 +304,7 @@ def get_provider_dashboard(): "total_used_vcpu": 0, } - for provider in providers_query.all(): + for provider in providers_query.all(): machine_data = machine_service.get_cp_machines_data(provider.id) if machine_data: all_machines_info["total_gpu"] += machine_data["gpu"]["total"] @@ -337,7 +341,7 @@ def get_provider_dashboard(): # get map info from that provider. cp_map_info = computing_provider_service.get_cp_map_info(provider.node_id) - city = cp_map_info["city"] + city = cp_map_info["city"] if city and city not in city_set: city_set.add(city) map_info.append(cp_map_info) @@ -392,7 +396,7 @@ def get_provider_dashboard(): if doc["_id"] in provider_data: provider_data[doc["_id"]]["uptime"] = doc["success"] / doc["total_requests"] - response.data = { + response.data = { "map_info": map_info, "total_providers": total_providers_cnt, "total_deployments": total_deployments, @@ -414,11 +418,12 @@ def get_provider_dashboard(): return jsonify(response.to_dict()), 200 + @cp_bp.route("/cp/machines", methods=["GET"]) def get_all_cp_machines(): response = Response(constant.STATUS_SUCCESS, message=None, data=None) - response.data = {"hardware" : []} + response.data = {"hardware": []} configs = config_service.get_all_configs() active_machines = computing_provider_service.get_all_active_machines() @@ -444,13 +449,13 @@ def get_all_cp_machines(): specs = machine.specs if not specs: continue - + if machine_service.validate_machine_spec(cfg_dict, specs): hardware_info["hardware_status"] = "available" cp = ComputingProvider.get_provider_by_id(machine.provider_id) if cp.region: machine_regions.add(cp.region) - + except Exception as e: logging.info(f"Error getting machine data for machine_id {machine.id}. Error {str(e)}]") @@ -459,7 +464,8 @@ def get_all_cp_machines(): return jsonify(response.to_dict()), http.client.OK -# given a node_id, check if any cp with that node_id is active + +# given a node_id, check if any cp with that node_id is active @cp_bp.route("/cp/active", methods=["GET"]) def get_active_cp(): response_object = Response(constant.STATUS_SUCCESS, message=None, data=None) @@ -478,6 +484,7 @@ def get_active_cp(): response_object.message = "Can not find any active cp with given node_id." return jsonify(response_object.to_dict()), 200 + @cp_bp.route('/cp/filter/', methods=['GET']) def filter_computing_providers_by_name(search_string: str): response_object = Response(constant.STATUS_SUCCESS, message=None, data=None) @@ -523,7 +530,7 @@ def get_computing_provider_details(node_id: str): @api_token_required def update_job_status(): response = Response(constant.STATUS_FAILED, message=None, data=None) - + data = request.get_json() uuid = data["job_uuid"] @@ -542,17 +549,17 @@ def update_job_status(): provider = computing_provider_service.get_cp_by_node_id(job.bidder_id) if provider is None: response.message = "Provider running the job not found." - logging.error(f'{user_id} : CURRENT USER' ) + logging.error(f'{user_id} : CURRENT USER') return jsonify(response.to_dict()), http.client.NOT_FOUND - + if provider.user_id != user_id: response.message = "Invalid permissions to update the status of this job." return jsonify(response.to_dict()), http.client.NOT_FOUND # Check if job is in a status that can be updated valid_statuses = [ - "paid", # This is old status that is replaced by Submitted, - # including for backwards compatibility + "paid", # This is old status that is replaced by Submitted, + # including for backwards compatibility constant.JOB_SUBMITTED, constant.JOB_DOWNLOAD_SOURCE, constant.JOB_UPLOAD_RESULT, @@ -566,7 +573,7 @@ def update_job_status(): response.message = "Job's status cannot be updated." return jsonify(response.to_dict()), http.client.BAD_REQUEST - if job.status!=constant.JOB_CANCELLED and job.status!=constant.JOB_CANCEL_FAILED: + if job.status != constant.JOB_CANCELLED and job.status != constant.JOB_CANCEL_FAILED: job.status = status update_record(job, db.session) # sync_service.send_sync_update("job", {"uuid":job.uuid}, {"status":job.status,"updated_at":job.updated_at}) @@ -582,21 +589,21 @@ def update_job_status(): response.message = "Job status updated" response.status = constant.SUCCESS return jsonify(response.to_dict()), 200 - + response.message = "Job already cancelled." response.status = constant.SUCCESS return jsonify(response.to_dict()), 200 @cp_bp.route('/cp//', methods=['GET']) -def cp_detail_for_space(node_id,space_uuid): +def cp_detail_for_space(node_id, space_uuid): response_object = Response(constant.STATUS_FAILED, message=None, data=None) cp = ComputingProvider.query.filter_by(node_id=node_id).first() space = Space.query.filter_by(uuid=space_uuid).first() response_object.data = { "payment_amount": 0, "running_time": 0, - "remaining_time":0, + "remaining_time": 0, "space_status": None } if cp is None: @@ -616,7 +623,7 @@ def cp_detail_for_space(node_id,space_uuid): response_object.message = "Space's task does not have a leading job" response_object.data["space_status"] = space.status return jsonify(response_object.to_dict()), 200 - payment_amount = computing_provider_service.calculate_job_payout(space,leading_job) + payment_amount = computing_provider_service.calculate_job_payout(space, leading_job) if type(payment_amount) == str: response_object.message = payment_amount else: @@ -624,5 +631,3 @@ def cp_detail_for_space(node_id,space_uuid): response_object.data["space_status"] = space.status response_object.status = constant.STATUS_SUCCESS return jsonify(response_object.to_dict()), 200 - - diff --git a/backend/router/stats.py b/backend/router/stats.py new file mode 100644 index 00000000..7683b009 --- /dev/null +++ b/backend/router/stats.py @@ -0,0 +1,53 @@ +""" All codes related to Stats""" +import logging +import traceback +from flask import jsonify, Blueprint +from backend.model.response import Response +from backend.service import job_service, user_service +from backend.utils import constant + +stats_bp = Blueprint("stats_bp", __name__) + + +@stats_bp.route('/stats/general', methods=['GET']) +def get_general_stats(): + """ + Retrieves general statistics about the system. + + Returns: + tuple: A tuple containing the JSON response and the HTTP status code. + + Raises: + None + """ + # Initialize response data structure + res_data = { + "total_jobs": None, + "total_running_jobs": None, + "total_leading_jobs": None, + "total_leading_job_duration": None, + "total_users": None, + "total_space_builders": None, + } + response = Response(status=constant.STATUS_FAILED, message=None) + + try: + # Fetch statistics from services + res_data["total_jobs"] = job_service.get_all_job() + res_data["total_running_jobs"] = job_service.get_all_running_job() + res_data["total_leading_jobs"] = job_service.get_all_leading_job() + res_data["total_leading_job_duration"] = job_service.get_all_leading_job_duration() + res_data["total_users"] = user_service.get_all_user() + res_data["total_space_builders"] = user_service.get_all_space_builder() + + # Update response object with fetched data + response.data = res_data + response.status = constant.STATUS_SUCCESS + response.message = "General stats found." + + return jsonify(response.to_dict()), 200 + except Exception as e: + # Log error and update response object in case of an exception + logging.error(f"Failed to retrieve general stats: {str(e)}\n{traceback.format_exc()}") + response.message = "Failed to retrieve general stats." + return jsonify(response.to_dict()), 500 diff --git a/backend/router/user.py b/backend/router/user.py index 309e387e..7dbd923d 100644 --- a/backend/router/user.py +++ b/backend/router/user.py @@ -29,216 +29,12 @@ def get_user_info(wallet_address): if user is None: response.message = "No user found." return jsonify(response.to_dict()), http.client.NOT_FOUND + # TODO: Remove unnecessary fields from response response.data = user.to_dict() response.status = constant.STATUS_SUCCESS return jsonify(response.to_dict()), http.client.OK - -@user_bp.route('/profile', methods=['Get']) -@jwt_or_api_token_required -def get_profile(user): - # TODO this API should request a lot of these data points individually as their own APIs - - # Get all incoming requests that need to be approved/rejected - license_requests_notifications = license_request_service.get_user_incoming_pending_requests(user) - # Sort request notifications by updated time - license_requests_notifications.sort(key=lambda lr: lr.updated_at, reverse=True) - - license_request_notif_list = [] - for license_req in license_requests_notifications: - license_req_dict = license_req.to_dict() - license_req_dict["contract_address"] = None - - tx_hash = None - owner = user_service.get_user_by_address(license_req.owner_address) - if license_req.source_type == "Space": - # Get contract address for license requests' space - space = Space.query.filter_by(name=license_req.name, user_id=owner.id).filter( - Space.status != constant.DELETED).first() - if space is None: - continue - tx_hash = TransactionHash.query.filter_by(source_type=constant.SPACE_NFT_SOURCE, - reference_id=space.id).first() - else: - # Get contract address for license requests' dataset - dataset = Dataset.query.filter_by(name=license_req.name, user_id=owner.id, status=constant.ACTIVE).first() - if dataset is None: - continue - tx_hash = TransactionHash.query.filter_by(source_type=constant.DATASET, reference_id=dataset.id).first() - - if tx_hash is not None: - scan_api = os.getenv("SCAN_URL") - scan_url = f"{scan_api}/get_factory_details?transaction_hash={tx_hash.transaction_hash}&chain_id={tx_hash.chain_id}" - - try: - res = requests.get(scan_url).content.decode("utf-8") - - if res != "NFT Factory record not found": - data = json.loads(res) - license_req_dict["contract_address"] = data["dataNFTAddress"] - except Exception as e: - logging.info(f"failed to get license tx hash: {str(e)}") - license_request_notif_list.append(license_req_dict) - - # Get all outgoing and pending requests - outgoing_pending_license_requests = license_request_service.get_user_outgoing_pending_requests(user) - outgoing_pending_license_requests_list = [request.to_dict() for request in outgoing_pending_license_requests] - - wallet_address = user.public_address - - dataset_list = [] - datasets = user_service.get_users_datasets(user) - for dataset in datasets: - dataset_dict = dataset.to_dict() - dataset_dict["wallet_address"] = wallet_address - dataset_list.append(dataset_dict) - - space_list = [] - spaces = Space.query.filter_by(is_public=True, user_id=user.id).filter(not_(Space.status == 'Deleted')) - for space in spaces: - space_dict = space.to_dict() - space_dict["wallet_address"] = wallet_address - space_list.append(space_dict) - - received_license_list = [] - - for license in license_service.get_user_received_space_licenses(user): - space = Space.query.filter(Space.status != constant.DELETED).filter_by(id=license.space_id).first() - if space is None: - continue - - license_dict = license.to_dict() - license_dict["type"] = constant.SPACE_LICENSE_SOURCE - license_dict["cid"] = None - if license.ipfs_uri is not None: - _, cid = license.ipfs_uri.split("/ipfs/") - license_dict["cid"] = cid - license_dict["name"] = space.name - - received_license_list.append(license_dict) - - for license in license_service.get_user_received_dataset_licenses(user): - dataset = Dataset.query.filter_by(id=license.dataset_id, status=constant.ACTIVE).first() - if dataset is None: - continue - - license_dict = license.to_dict() - license_dict["type"] = constant.DATASET_LICENSE_SOURCE - license_dict["cid"] = None - if license.ipfs_uri is not None: - _, cid = license.ipfs_uri.split("/ipfs/") - license_dict["cid"] = cid - license_dict["name"] = dataset.name - - received_license_list.append(license_dict) - - received_license_list.sort(key=lambda license: license["updated_at"], reverse=True) - - response = Response(constant.SUCCESS, message=None, data=None) - response.data = { - "user": user.to_dict(), - "dataset": dataset_list, - "space": space_list, - "license_requests_notifications": license_request_notif_list, - "outgoing_pending_license_requests": outgoing_pending_license_requests_list, - "received_licenses": received_license_list, - } - response.message = "User Profile found." - return jsonify(response.to_dict()), 200 - - -@user_bp.route('/profile', methods=['POST']) -@jwt_or_api_token_required -def update_profile(user): - full_name = request.form.get("full_name") - homepage = request.form.get("homepage") - github_username = request.form.get("github_username") - twitter_username = request.form.get("twitter_username") - - updated_user = user_service.update_user_data(user, full_name, homepage, github_username, twitter_username) - - response_object = Response(constant.SUCCESS) - response_object.message = "User updated." - response_object.data = { - "user": updated_user.to_dict(), - } - return jsonify(response_object.to_dict()), 200 - - -@user_bp.route('/user/tier/update', methods=['POST']) -@jwt_or_api_token_required -def update_user_tier(admin_user): - response = Response(constant.STATUS_FAILED, message=None, data=None) - - if not user_service.user_has_role(admin_user, constant.ROLE_ADMIN): - response.message = "User does not have permission." - return jsonify(response.to_dict()), http.client.FORBIDDEN - - user_address = request.form.get("wallet_address") - user = user_service.get_user_by_address(user_address) - if user is None: - response.message = "Invalid wallet address or no user found for given wallet." - return jsonify(response.to_dict()), http.client.BAD_REQUEST - - new_tier = request.form.get("tier") - try: - user_service.update_user_tier(user, new_tier) - except exception.InvalidTierError as e: - response.message = str(e) - return jsonify(response.to_dict()), http.client.BAD_REQUEST - except Exception as e: - response.message = "Failed to update user tier." - return jsonify(response.to_dict()), http.client.INTERNAL_SERVER_ERROR - - response.message = f"{user_address} updated to {new_tier} tier" - response.data = { - "updated_user": user.to_dict() - } - response.status = constant.STATUS_SUCCESS - - return jsonify(response.to_dict()), 200 - - -@user_bp.route('/user/profile/avatar', methods=['POST']) -@jwt_or_api_token_required -def upload_profile_avatar(user): - try: - avatar = request.files['avatar'] - except Exception: - response_msg = "Invalid parameters." - response_object = Response(constant.STATUS_FAILED, message=response_msg, data=None) - return jsonify(response_object.to_dict()), http.client.BAD_REQUEST - - try: - if not check_extension(avatar.filename, ['.png', '.jpeg', '.jpg', '.gif']): - response_msg = "Extention must be one of .png, .jpeg, .jpg, .gif." - response_object = Response(constant.STATUS_FAILED, message=response_msg, data=None) - return jsonify(response_object.to_dict()), http.client.BAD_REQUEST - - file = user_service.create_avatar(avatar, user.public_address) - if file == None: - response_msg = "File size must be smaller than 2 MiB." - response_object = Response(constant.STATUS_FAILED, message=response_msg, data=None) - return jsonify(response_object.to_dict()), http.client.BAD_REQUEST - except Exception as e: - logging.error(str(e) + '\n' + traceback.format_exc()) - response_object = Response(constant.STATUS_FAILED, message="Internal server error.", data=None) - return jsonify(response_object.to_dict()), http.client.INTERNAL_SERVER_ERROR - - try: - user_service.update_user_avatar(user, file.payloadCid) - except Exception as e: - logging.error(str(e) + '\n' + traceback.format_exc()) - response_object = Response(constant.STATUS_FAILED, message="Internal server error.", data=None) - return jsonify(response_object.to_dict()), http.client.INTERNAL_SERVER_ERROR - - response_data = { - "user": user.to_dict(), - } - response_object = Response(constant.STATUS_SUCCESS, message="Avatar uploaded.", data=response_data) - return jsonify(response_object.to_dict()), http.client.OK - - +# TODO: Need Modified @user_bp.route('/user/provider/payments', methods=["GET"]) @jwt_or_api_token_required def get_user_provider_payment_history(user):