Skip to content

Commit

Permalink
Upload scripts are ready for check.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubelMozumder committed Jan 29, 2025
1 parent 7edce2a commit 9e88e69
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/pynxtools_spm/nomad_uploader/reader_config_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def convert_spm_experiments(
for file in input_params.input_file:
zipf.write(file, arcname=file.split("/")[-1])
input_params.zip_file_path = zip_file
print(f"########### print from : ", input_params.zip_file_path)
return zip_file


Expand Down
137 changes: 107 additions & 30 deletions src/pynxtools_spm/nomad_uploader/upload_srcript.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@
SPMConvertInputParameters,
convert_spm_experiments,
)
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import asdict
import time

from pynxtools_spm.nomad_uploader.nomad_upload_api import (
get_authentication_token,
create_dataset,
upload_to_NOMAD,
edit_upload_metadata,
publish_upload,
check_upload_status,
)
from pynxtools_spm.nomad_uploader.files_movers import copy_directory_structure

from multiprocessing import Process, Lock, Queue

current_dir = Path(__file__).parent

nomad_url = "https://nomad-lab.eu/prod/v1/test/api/v1/"
username = "Mozumder"
password = ""
password = "*#R516660a*#"
parallel_processing = True
time_for_cron_job = 10 * 60 # seconds

exp_src_path = (
current_dir.parent.parent.parent
Expand All @@ -31,7 +39,7 @@
publish_to_nomad = False # Publish the upload to the central NOMAD repository

# List of SPMConvertInputParameters objects to run reader on each object
SPM_PARAMS_OBJ_L = ()
SPM_PARAMS_OBJ_L = []


def set_and_get_prepare_parameters(file):
Expand Down Expand Up @@ -81,40 +89,109 @@ def set_and_get_prepare_parameters(file):
)
if not params_obj:
return
global SPM_PARAMS_OBJ_L
SPM_PARAMS_OBJ_L = (*SPM_PARAMS_OBJ_L, params_obj)
# global SPM_PARAMS_OBJ_L
SPM_PARAMS_OBJ_L.append(params_obj)


def upload_to_nomad():
if __name__ == "__main__":
token = get_authentication_token(nomad_url, username, password)
if not token:
print("Authentication failed!")
exit(1)
copy_directory_structure(
exp_src_path, exp_dst_path, run_action_on_files=set_and_get_prepare_parameters
)

print("Files copied over to the destination directory.", SPM_PARAMS_OBJ_L)
# TODO: We can use parallel processing here
for input_params in SPM_PARAMS_OBJ_L:
zip_file = convert_spm_experiments(input_params)

tocken = get_authentication_token(nomad_url, username, password)
upload_id = upload_to_NOMAD(nomad_url, tocken, zip_file)

# To modify metadata
if modify_upload_metadata:
dataset_id = create_dataset(nomad_url, tocken, "Test_Dataset")
metadata = {
"metadata": {
"upload_name": "Test_Upload",
"references": ["https://doi.org/xx.xxxx/x.xxxx"],
"datasets": dataset_id,
"embargo_length": 0,
"coauthors": ["coauthor@affiliation.de"],
"comment": "This is a test upload...",
},
}
edit_upload_metadata(nomad_url, tocken, upload_id, metadata)

if publish_to_nomad:
publish_upload(nomad_url, tocken, upload_id)

if not parallel_processing:
pass

if __name__ == "__main__":
upload_to_nomad()
lock = Lock()
results_q = Queue()
time_out = int(time_for_cron_job / 3) # seconds

def queue_results(input_params, lock, results_q):
lock.acquire()
try:
input_params = convert_spm_experiments(input_params)
results_q.put(input_params)
except Exception as e:
print(f"Oops! Error in processing {input_params.input_file}: {e}")
raise e
finally:
lock.release()

processes_list = []

for input_params in SPM_PARAMS_OBJ_L:
p = Process(
target=queue_results,
args=(input_params, lock, results_q),
)
p.start()
processes_list.append(p)
print("Processes started...", processes_list)
print("Processes started...", SPM_PARAMS_OBJ_L)
for _, (p, input_params) in enumerate(zip(processes_list, SPM_PARAMS_OBJ_L)):
p.join(time_out)
if p.is_alive():
print(
f"Process is still running, terminating it. Process handles input data {asdict(input_params)}",
)
p.terminate()
p.join()

indices = []
zips_to_be_upload = []
while not results_q.empty():
zips_to_be_upload.append(results_q.get())
print("############# All processes are done!", zips_to_be_upload)
upload_time_limit = datetime.now() + timedelta(seconds=time_out)

while len(zips_to_be_upload) != len(indices) and datetime.now() < upload_time_limit:
for ind, zip_to_upload in enumerate(zips_to_be_upload):
print(f"######## zip file Processing {zip_to_upload}...")
if ind in indices: # already processed or failed
continue
if not zip_to_upload:
indices.append(ind)
continue
indices.append(ind)
upload_id = upload_to_NOMAD(nomad_url, token, zip_to_upload)
print(f"Upload ID: {upload_id}")
# To modify metadata
if modify_upload_metadata:
dataset_id = create_dataset(nomad_url, token, "Test_Dataset")
metadata = {
"metadata": {
"upload_name": "Test_Upload",
"references": ["https://doi.org/xx.xxxx/x.xxxx"],
"datasets": dataset_id,
"embargo_length": 0,
"coauthors": ["coauthor@affiliation.de"],
"comment": "This is a test upload...",
},
}
edit_upload_metadata(nomad_url, token, upload_id, metadata)

if publish_to_nomad:
publish_upload(nomad_url, token, upload_id)
massage = "Adding files"
max_attempt = 20
attempt = 0
while (
massage
in (
"Adding files",
"Matching",
"Waiting for results (level 0)",
)
and attempt < max_attempt
):
attempt += 1
massage = check_upload_status(nomad_url, token, upload_id)
time.sleep(2 / 60) # 1 second
print(f"Upload status: {massage}")
print("All done!", indices)

0 comments on commit 9e88e69

Please sign in to comment.