Skip to content

Commit

Permalink
code formatting, minor refactoring, comments
Browse files Browse the repository at this point in the history
  • Loading branch information
amitabhverma committed Jan 21, 2025
1 parent 282c4d5 commit a80a337
Show file tree
Hide file tree
Showing 6 changed files with 1,204 additions and 624 deletions.
8 changes: 4 additions & 4 deletions recOrder/cli/apply_inverse_transfer_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,16 +377,16 @@ def apply_inverse_transfer_function_cli(

doPrint = True # CLI prints Job status when used as cmd line
if unique_id != "": # no unique_id means no job submission info being listened to
JM.startClient()
JM.start_client()
i=0
for j in jobs:
job : submitit.Job = j
job_idx : str = job.job_id
position = input_position_dirpaths[i]
JM.putJobInList(job, unique_id, str(job_idx), position, str(executor.folder.absolute()))
JM.put_Job_in_list(job, unique_id, str(job_idx), position, str(executor.folder.absolute()))
i += 1
JM.sendDataThread()
JM.setShorterTimeout()
JM.send_data_thread()
JM.set_shorter_timeout()
doPrint = False # CLI printing disabled when using GUI

monitor_jobs(jobs, input_position_dirpaths, doPrint)
Expand Down
49 changes: 27 additions & 22 deletions recOrder/cli/jobs_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@
class JobsManagement():

def __init__(self, *args, **kwargs):
self.executor = submitit.AutoExecutor(folder="logs")
self.clientsocket = None
self.uIDsjobIDs = {} # uIDsjobIDs[uid][jid] = job
self.DATA_QUEUE = []

def checkForJobIDFile(self, jobID, logsPath, extension="out"):
def check_for_jobID_File(self, jobID, logs_path, extension="out"):

if Path(logsPath).exists():
files = os.listdir(logsPath)
if Path(logs_path).exists():
files = os.listdir(logs_path)
try:
for file in files:
if file.endswith(extension):
if jobID in file:
file_path = os.path.join(logsPath, file)
file_path = os.path.join(logs_path, file)
f = open(file_path, "r")
txt = f.read()
f.close()
Expand All @@ -36,22 +35,28 @@ def checkForJobIDFile(self, jobID, logsPath, extension="out"):
print(exc.args)
return ""

def setShorterTimeout(self):
def set_shorter_timeout(self):
self.clientsocket.settimeout(30)

def startClient(self):
def start_client(self):
try:
self.clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clientsocket.settimeout(300)
self.clientsocket.connect(('localhost', SERVER_PORT))
self.clientsocket.settimeout(None)

thread = threading.Thread(target=self.stopClient)
thread = threading.Thread(target=self.stop_client)
thread.start()
except Exception as exc:
print(exc.args)

def stopClient(self):
# The stopClient() is called right with the startClient() but does not stop
# and essentially is a wait thread listening and is triggered by either a
# connection or timeout. Based on condition triggered by user, reconstruction
# completion or errors the end goal is to close the socket connection which
# would let the CLI exit. I could break it down to 2 parts but the idea was to
# keep the clientsocket.close() call within one method to make it easier to follow.
def stop_client(self):
try:
time.sleep(2)
while True:
Expand All @@ -73,11 +78,11 @@ def stopClient(self):
job_idx = str(json_obj["jID"])
cmd = json_obj["command"]
if cmd == "clientRelease":
if self.hasSubmittedJob(u_idx, job_idx):
if self.has_submitted_job(u_idx, job_idx):
self.clientsocket.close()
break
if cmd == "cancel":
if self.hasSubmittedJob(u_idx, job_idx):
if self.has_submitted_job(u_idx, job_idx):
try:
job = self.uIDsjobIDs[u_idx][job_idx]
job.cancel()
Expand All @@ -104,7 +109,7 @@ def stopClient(self):
self.clientsocket.close()
print(exc.args)

def checkAllExpJobsCompletion(self, uID):
def check_all_ExpJobs_completion(self, uID):
if uID in SERVER_uIDsjobIDs.keys():
for jobEntry in SERVER_uIDsjobIDs[uID].keys():
job:submitit.Job = SERVER_uIDsjobIDs[uID][jobEntry]["job"]
Expand All @@ -115,24 +120,24 @@ def checkAllExpJobsCompletion(self, uID):
return False
return True

def putJobCompletionInList(self, jobBool, uID: str, jID: str, mode="client"):
def put_Job_completion_in_list(self, job_bool, uID: str, jID: str, mode="client"):
if uID in SERVER_uIDsjobIDs.keys():
if jID in SERVER_uIDsjobIDs[uID].keys():
SERVER_uIDsjobIDs[uID][jID]["bool"] = jobBool
SERVER_uIDsjobIDs[uID][jID]["bool"] = job_bool

def addData(self, data):
def add_data(self, data):
self.DATA_QUEUE.append(data)

def sendDataThread(self):
thread = threading.Thread(target=self.sendData)
def send_data_thread(self):
thread = threading.Thread(target=self.send_data)
thread.start()

def sendData(self):
def send_data(self):
data = "".join(self.DATA_QUEUE)
self.clientsocket.send(data.encode())
self.DATA_QUEUE = []

def putJobInList(self, job, uID: str, jID: str, well:str, log_folder_path:str="", mode="client"):
def put_Job_in_list(self, job, uID: str, jID: str, well:str, log_folder_path:str="", mode="client"):
try:
well = str(well)
jID = str(jID)
Expand All @@ -148,7 +153,7 @@ def putJobInList(self, job, uID: str, jID: str, well:str, log_folder_path:str=""
self.uIDsjobIDs[uID][jID] = job
json_obj = {uID:{"jID": str(jID), "pos": well, "log": log_folder_path}}
json_str = json.dumps(json_obj)+"\n"
self.addData(json_str)
self.add_data(json_str)
else:
# from server side jobs object entry is a None object
# this will be later checked as completion boolean for a ExpID which might
Expand All @@ -165,7 +170,7 @@ def putJobInList(self, job, uID: str, jID: str, well:str, log_folder_path:str=""
except Exception as exc:
print(exc.args)

def hasSubmittedJob(self, uID: str, jID: str, mode="client")->bool:
def has_submitted_job(self, uID: str, jID: str, mode="client")->bool:
jID = str(jID)
if mode == "client":
if uID in self.uIDsjobIDs.keys():
Expand Down
22 changes: 11 additions & 11 deletions recOrder/cli/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
import sys


def _move_cursor_up(n_lines, doPrint=True):
if doPrint:
def _move_cursor_up(n_lines, do_print=True):
if do_print:
sys.stdout.write("\033[F" * n_lines)


def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None, doPrint=True):
def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None, do_print=True):

columns = [15, 30, 40, 50]

# header
if doPrint:
if do_print:
sys.stdout.write(
"\033[K" # clear line
"\033[96mID" # cyan
Expand Down Expand Up @@ -48,7 +48,7 @@ def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None, doP
color = "\033[91m" # red

if i in print_indices:
if doPrint:
if do_print:
sys.stdout.write(
f"\033[K" # clear line
f"{color}{job.job_id}"
Expand All @@ -58,7 +58,7 @@ def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None, doP
f"\033[{columns[3]}G {elapsed_list[i]} s\n"
)
sys.stdout.flush()
if doPrint:
if do_print:
print(
f"\033[32m{complete_count}/{len(jobs)} jobs complete. "
"<ctrl+z> to move monitor to background. "
Expand Down Expand Up @@ -92,7 +92,7 @@ def _get_jobs_to_print(jobs, num_to_print):
return job_indices_to_print


def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path], doPrint=True):
def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path], do_print=True):
"""Displays the status of a list of submitit jobs with corresponding paths.
Parameters
Expand All @@ -113,7 +113,7 @@ def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path], doPrin

# print all jobs once if terminal is too small
if shutil.get_terminal_size().lines - NON_JOB_LINES < len(jobs):
_print_status(jobs, position_dirpaths, elapsed_list, doPrint)
_print_status(jobs, position_dirpaths, elapsed_list, do_print)

# main monitor loop
try:
Expand All @@ -130,15 +130,15 @@ def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path], doPrin
position_dirpaths,
elapsed_list,
job_indices_to_print,
doPrint,
do_print,
)

time.sleep(1)
_move_cursor_up(num_jobs_to_print + 2, doPrint)
_move_cursor_up(num_jobs_to_print + 2, do_print)

# Print final status
time.sleep(1)
_print_status(jobs, position_dirpaths, elapsed_list, doPrint=doPrint)
_print_status(jobs, position_dirpaths, elapsed_list, do_print=do_print)

# cancel jobs if ctrl+c
except KeyboardInterrupt:
Expand Down
2 changes: 1 addition & 1 deletion recOrder/plugin/main_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(self, napari_viewer: Viewer):
# Setup GUI elements
self.ui = gui.Ui_Form()
self.ui.setupUi(self)
self.ui.tab_reconstruction.setViewer(napari_viewer)
self.ui.tab_reconstruction.set_viewer(napari_viewer)

# Override initial tab focus
self.ui.tabWidget.setCurrentIndex(0)
Expand Down
Loading

0 comments on commit a80a337

Please sign in to comment.