diff --git a/src/pynxtools_spm/nomad_uploader/reader_config_setup.py b/src/pynxtools_spm/nomad_uploader/reader_config_setup.py index 955ab8b..8c6766d 100644 --- a/src/pynxtools_spm/nomad_uploader/reader_config_setup.py +++ b/src/pynxtools_spm/nomad_uploader/reader_config_setup.py @@ -78,6 +78,7 @@ def convert_spm_experiments( for file in input_params.input_file: zipf.write(file, arcname=file.split("/")[-1]) input_params.zip_file_path = zip_file + print(f"########### print from : ", input_params.zip_file_path) return zip_file diff --git a/src/pynxtools_spm/nomad_uploader/upload_srcript.py b/src/pynxtools_spm/nomad_uploader/upload_srcript.py index 1e7ac56..0560ee1 100644 --- a/src/pynxtools_spm/nomad_uploader/upload_srcript.py +++ b/src/pynxtools_spm/nomad_uploader/upload_srcript.py @@ -2,7 +2,10 @@ SPMConvertInputParameters, convert_spm_experiments, ) +from datetime import datetime, timedelta from pathlib import Path +from dataclasses import asdict +import time from pynxtools_spm.nomad_uploader.nomad_upload_api import ( get_authentication_token, @@ -10,14 +13,19 @@ upload_to_NOMAD, edit_upload_metadata, publish_upload, + check_upload_status, ) from pynxtools_spm.nomad_uploader.files_movers import copy_directory_structure +from multiprocessing import Process, Lock, Queue + current_dir = Path(__file__).parent nomad_url = "https://nomad-lab.eu/prod/v1/test/api/v1/" username = "Mozumder" -password = "" +password = "*#R516660a*#" +parallel_processing = True +time_for_cron_job = 10 * 60 # seconds exp_src_path = ( current_dir.parent.parent.parent @@ -31,7 +39,7 @@ publish_to_nomad = False # Publish the upload to the central NOMAD repository # List of SPMConvertInputParameters objects to run reader on each object -SPM_PARAMS_OBJ_L = () +SPM_PARAMS_OBJ_L = [] def set_and_get_prepare_parameters(file): @@ -81,40 +89,109 @@ def set_and_get_prepare_parameters(file): ) if not params_obj: return - global SPM_PARAMS_OBJ_L - SPM_PARAMS_OBJ_L = (*SPM_PARAMS_OBJ_L, params_obj) + # global SPM_PARAMS_OBJ_L + SPM_PARAMS_OBJ_L.append(params_obj) -def upload_to_nomad(): +if __name__ == "__main__": + token = get_authentication_token(nomad_url, username, password) + if not token: + print("Authentication failed!") + exit(1) copy_directory_structure( exp_src_path, exp_dst_path, run_action_on_files=set_and_get_prepare_parameters ) + print("Files copied over to the destination directory.", SPM_PARAMS_OBJ_L) # TODO: We can use parallel processing here - for input_params in SPM_PARAMS_OBJ_L: - zip_file = convert_spm_experiments(input_params) - - tocken = get_authentication_token(nomad_url, username, password) - upload_id = upload_to_NOMAD(nomad_url, tocken, zip_file) - - # To modify metadata - if modify_upload_metadata: - dataset_id = create_dataset(nomad_url, tocken, "Test_Dataset") - metadata = { - "metadata": { - "upload_name": "Test_Upload", - "references": ["https://doi.org/xx.xxxx/x.xxxx"], - "datasets": dataset_id, - "embargo_length": 0, - "coauthors": ["coauthor@affiliation.de"], - "comment": "This is a test upload...", - }, - } - edit_upload_metadata(nomad_url, tocken, upload_id, metadata) - - if publish_to_nomad: - publish_upload(nomad_url, tocken, upload_id) + if not parallel_processing: + pass -if __name__ == "__main__": - upload_to_nomad() + lock = Lock() + results_q = Queue() + time_out = int(time_for_cron_job / 3) # seconds + + def queue_results(input_params, lock, results_q): + lock.acquire() + try: + input_params = convert_spm_experiments(input_params) + results_q.put(input_params) + except Exception as e: + print(f"Oops! Error in processing {input_params.input_file}: {e}") + raise e + finally: + lock.release() + + processes_list = [] + + for input_params in SPM_PARAMS_OBJ_L: + p = Process( + target=queue_results, + args=(input_params, lock, results_q), + ) + p.start() + processes_list.append(p) + print("Processes started...", processes_list) + print("Processes started...", SPM_PARAMS_OBJ_L) + for _, (p, input_params) in enumerate(zip(processes_list, SPM_PARAMS_OBJ_L)): + p.join(time_out) + if p.is_alive(): + print( + f"Process is still running, terminating it. Process handles input data {asdict(input_params)}", + ) + p.terminate() + p.join() + + indices = [] + zips_to_be_upload = [] + while not results_q.empty(): + zips_to_be_upload.append(results_q.get()) + print("############# All processes are done!", zips_to_be_upload) + upload_time_limit = datetime.now() + timedelta(seconds=time_out) + + while len(zips_to_be_upload) != len(indices) and datetime.now() < upload_time_limit: + for ind, zip_to_upload in enumerate(zips_to_be_upload): + print(f"######## zip file Processing {zip_to_upload}...") + if ind in indices: # already processed or failed + continue + if not zip_to_upload: + indices.append(ind) + continue + indices.append(ind) + upload_id = upload_to_NOMAD(nomad_url, token, zip_to_upload) + print(f"Upload ID: {upload_id}") + # To modify metadata + if modify_upload_metadata: + dataset_id = create_dataset(nomad_url, token, "Test_Dataset") + metadata = { + "metadata": { + "upload_name": "Test_Upload", + "references": ["https://doi.org/xx.xxxx/x.xxxx"], + "datasets": dataset_id, + "embargo_length": 0, + "coauthors": ["coauthor@affiliation.de"], + "comment": "This is a test upload...", + }, + } + edit_upload_metadata(nomad_url, token, upload_id, metadata) + + if publish_to_nomad: + publish_upload(nomad_url, token, upload_id) + massage = "Adding files" + max_attempt = 20 + attempt = 0 + while ( + massage + in ( + "Adding files", + "Matching", + "Waiting for results (level 0)", + ) + and attempt < max_attempt + ): + attempt += 1 + massage = check_upload_status(nomad_url, token, upload_id) + time.sleep(2 / 60) # 1 second + print(f"Upload status: {massage}") + print("All done!", indices)