Skip to content

Commit

Permalink
Standalone auto-generated reconstruction GUI (#487)
Browse files Browse the repository at this point in the history
* pydantic-model initial commit

- pydantic model UI widget creation based on CLI model
- does no processing, creates yaml files

* Updated prototype - partially working

- refactored model to GUI implementation
- added table to hold processing info, entries purge on completion
- using click testing for now in prototyping
- creates yaml to GUI
- using napari notifications for msgs
- testing on Windows

* first working alpha version

- Multi processing using pydantic models
- Implemented a client/server approach for managing jobs queue
- Updates each job status based on logs
- Implemented Unique ID in CLI to associate Jobs with Process
- Tested on Windows with locally modified submitit

* possible fix for macOS

- import OS specific pydantic imports for ModelMetaclass

* use ver specific fix for pydantic instead of OS

- pydantic possibly needs to be pinned at 1.10.19 @talonchandler
- implemented ver specific import for now

* set a timeout limit on job update thread

- check for update on the job file, if not - notify user in process table and exit after a 2 min timeout

* fixes & enhancements

- fixes newly created yaml files to reside besides output zarrs
- fixes Model numbers not correctly representing on Build & Run button
- fixes "reconstruction algorithm" to show up as dropdown box
- fixes "regularization strength" spinbox to accept 5 significant decimal place values
- background path now accepts Path and str in model and shows up as directory selector button
- each model container setting is now a collapsible item
- fixes issues when widget is closed/hidden and then started again

- all model(s) validation errors are not collected and presented in notification and highlighted on GUI in one go
- each collapsible model entry also highlights if any validation errors are present
- added confirm dialog for certain actions
- added clear results button
- job update thread will exit after a timeout limit and no update detected in job output file

* Update setup.cfg

- pinning pydantic lib ver to 1.10.19

* use PyQt6 for pyqtSignal

use PyQt6 for pyqtSignal

* standalone

- fixes standalone widget

* fixes for BG, etc

- fixes BG selection and resetting if Path is invalid
- fixes removal of row/stability - using Queued List to counter race conditions

- added .zarr data validation check for Input, also used for BG
- make uniqueID more unique
- break thread update operations when user has cleared the results table
- standalone GUI niceties

* - fixes for user initiated row delete while processing

another check if result row is deleted by user action while processing

* major refactor to support positions in datasets

- timeout of 5 min per Job
- BF button error handling when not available
- GUI initiates clearing logs on first run to avoid file exist errors

- a single processing run now supports the CLI spawning multiple jobs

* - fixes & enhancements

- stand-alone GUI cmd
- refactored UI based on suggestions
- fixes cyclic import for stand-alone GUI
- duplicates prev model settings when available
- clears Model when changing Input datasets
- load model looks for .yml files

* checking for RuntimeWarning value

* on-the-fly processing

- checks if data is being acquired and utilizes polling (every 10s) to gather processing information and submits processing jobs once a minimum dim has been acquired
- added a script to simulate acq of .zarr storage
- added group-box for sections
- added scrolling for tab so that it does not block horizontal resizing

* Delete recOrder/tests/widget_tests/test_pydantic_model_widget.py

* Delete recOrder/tests/widget_tests/test_simulate_acq.py

* ditching v1.main from pydantic and reverting pydantic>=1.10.17 to test workflows

* dont initialize server listening in main init of worker class

- moved socket listening to sub method from init() and initialize when required for the first time (testing build & deploy)

* incorporating discussed GUI changes

- GUI changes to reflect sketch as discussed with @talonchandler @ieivanov
- Output directory and validation
- Open dataset after reconstruction based on user choice
- pinning pydantic lib back to 1.10.19

* exit polling loop when closing app before finish

* implemented a Stop method for On-The-Fly polling reconstructions

implemented a Stop method for On-The-Fly polling reconstructions if required

* GUI related

- added info icon next to Input Store label when a dataset path is set which displays channel names for convenience, more metadata could be displayed that might be relevant for reconstruction
- removed line widget at end of each model container to make scrollbar more apparent

* create logs dir if it does not exist

- we are only reading the location here but it needs to exist, the CLI is creating the logs based on the path

* fixes output path not setting correctly

* added pixel size meta to info icon

* update output dir in defined models when changed

* make on-the-fly entry scrollable and not block resizing

* added script to simulate a "fake" recOrder acquisition

- script to test on-the-fly reconstruction POC

* fix for checking output path existing

* fix for checking output path existing

* logs folder to be created besides dataset

- logs folder will reside next to the output dataset
- fixed an issue with reconstructed data showing irrespective of selection

* display SLURM related errors if Jobs output txt is empty

* top scrollbar for model, container sizing now does not need second vert scrollbar

* multi-pos bugfix + multiple enhancements

fixes:
- multi-pos dataset would be displayed after single pos processing

enhancements:
- CLI will print Job status when used as cmd line, not for GUI
- use single socket connection when multi-pos is spawned by a request
- added "rx" field to model-container
- minor GUI tweaks

* code formatting, minor refactoring, comments

* with rx default as 1, catch and report OOM errors
  • Loading branch information
amitabhverma authored Jan 23, 2025
1 parent 29e804f commit e2eb24a
Show file tree
Hide file tree
Showing 16 changed files with 4,424 additions and 45 deletions.
1 change: 1 addition & 0 deletions recOrder/acq/acquisition_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ def _reconstruct(self):
transfer_function_dirpath=transfer_function_path,
config_filepath=self.config_path,
output_dirpath=reconstruction_path,
unique_id="recOrderAcq"
)

# Read reconstruction to pass to emitters
Expand Down
42 changes: 33 additions & 9 deletions recOrder/cli/apply_inverse_transfer_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
from functools import partial
from pathlib import Path

import os
import click
import numpy as np
import torch
import torch.multiprocessing as mp
import submitit
from iohub import open_ome_zarr

from typing import Final
from recOrder.cli import jobs_mgmt

from recOrder.cli import apply_inverse_models
from recOrder.cli.parsing import (
config_filepath,
Expand All @@ -18,6 +22,7 @@
processes_option,
transfer_function_dirpath,
ram_multiplier,
unique_id,
)
from recOrder.cli.printing import echo_headline, echo_settings
from recOrder.cli.settings import ReconstructionSettings
Expand All @@ -28,6 +33,7 @@
from recOrder.io import utils
from recOrder.cli.monitor import monitor_jobs

JM = jobs_mgmt.JobsManagement()

def _check_background_consistency(
background_shape, data_shape, input_channel_names
Expand Down Expand Up @@ -293,6 +299,7 @@ def apply_inverse_transfer_function_cli(
output_dirpath: Path,
num_processes: int = 1,
ram_multiplier: float = 1.0,
unique_id: str = ""
) -> None:
output_metadata = get_reconstruction_output_metadata(
input_position_dirpaths[0], config_filepath
Expand Down Expand Up @@ -336,36 +343,53 @@ def apply_inverse_transfer_function_cli(
f"{cpu_request} CPU{'s' if cpu_request > 1 else ''} and "
f"{gb_ram_request} GB of memory per CPU."
)
executor = submitit.AutoExecutor(folder="logs")


name_without_ext = os.path.splitext(Path(output_dirpath).name)[0]
executor_folder = os.path.join(Path(output_dirpath).parent.absolute(), name_without_ext + "_logs")
executor = submitit.AutoExecutor(folder=Path(executor_folder))

executor.update_parameters(
slurm_array_parallelism=np.min([50, num_jobs]),
slurm_mem_per_cpu=f"{gb_ram_request}G",
slurm_cpus_per_task=cpu_request,
slurm_time=60,
slurm_partition="cpu",
timeout_min=jobs_mgmt.JOBS_TIMEOUT
# more slurm_*** resource parameters here
)

jobs = []
with executor.batch():
for input_position_dirpath in input_position_dirpaths:
jobs.append(
executor.submit(
for input_position_dirpath in input_position_dirpaths:
job: Final = executor.submit(
apply_inverse_transfer_function_single_position,
input_position_dirpath,
transfer_function_dirpath,
config_filepath,
output_dirpath / Path(*input_position_dirpath.parts[-3:]),
num_processes,
output_metadata["channel_names"],
)
)
)
jobs.append(job)
echo_headline(
f"{num_jobs} job{'s' if num_jobs > 1 else ''} submitted {'locally' if executor.cluster == 'local' else 'via ' + executor.cluster}."
)

monitor_jobs(jobs, input_position_dirpaths)
doPrint = True # CLI prints Job status when used as cmd line
if unique_id != "": # no unique_id means no job submission info being listened to
JM.start_client()
i=0
for j in jobs:
job : submitit.Job = j
job_idx : str = job.job_id
position = input_position_dirpaths[i]
JM.put_Job_in_list(job, unique_id, str(job_idx), position, str(executor.folder.absolute()))
i += 1
JM.send_data_thread()
JM.set_shorter_timeout()
doPrint = False # CLI printing disabled when using GUI

monitor_jobs(jobs, input_position_dirpaths, doPrint)


@click.command()
Expand Down
41 changes: 41 additions & 0 deletions recOrder/cli/gui_widget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import sys
from PyQt6.QtWidgets import QApplication, QWidget, QVBoxLayout, QStyle
import click
from recOrder.plugin import tab_recon

try:
import qdarktheme
except:pass

PLUGIN_NAME = "recOrder: Computational Toolkit for Label-Free Imaging"
PLUGIN_ICON = "🔬"

@click.command()
def gui():
"""GUI for recOrder: Computational Toolkit for Label-Free Imaging"""

app = QApplication(sys.argv)
app.setStyle("Fusion") # Other options: "Fusion", "Windows", "macOS", "WindowsVista"
try:
qdarktheme.setup_theme("dark")
except:pass
window = MainWindow()
window.setWindowTitle(PLUGIN_ICON + " " + PLUGIN_NAME + " " + PLUGIN_ICON)

pixmapi = getattr(QStyle.StandardPixmap, "SP_TitleBarMenuButton")
icon = app.style().standardIcon(pixmapi)
window.setWindowIcon(icon)

window.show()
sys.exit(app.exec())

class MainWindow(QWidget):
def __init__(self):
super().__init__()
recon_tab = tab_recon.Ui_ReconTab_Form(stand_alone=True)
layout = QVBoxLayout()
self.setLayout(layout)
layout.addWidget(recon_tab.recon_tab_mainScrollArea)

if __name__ == "__main__":
gui()
184 changes: 184 additions & 0 deletions recOrder/cli/jobs_mgmt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import os, json
from pathlib import Path
import socket
import submitit
import threading, time

DIR_PATH = os.path.dirname(os.path.realpath(__file__))
FILE_PATH = os.path.join(DIR_PATH, "main.py")

SERVER_PORT = 8089 # Choose an available port
JOBS_TIMEOUT = 5 # 5 mins
SERVER_uIDsjobIDs = {} # uIDsjobIDs[uid][jid] = job

class JobsManagement():

def __init__(self, *args, **kwargs):
self.clientsocket = None
self.uIDsjobIDs = {} # uIDsjobIDs[uid][jid] = job
self.DATA_QUEUE = []

def check_for_jobID_File(self, jobID, logs_path, extension="out"):

if Path(logs_path).exists():
files = os.listdir(logs_path)
try:
for file in files:
if file.endswith(extension):
if jobID in file:
file_path = os.path.join(logs_path, file)
f = open(file_path, "r")
txt = f.read()
f.close()
return txt
except Exception as exc:
print(exc.args)
return ""

def set_shorter_timeout(self):
self.clientsocket.settimeout(30)

def start_client(self):
try:
self.clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clientsocket.settimeout(300)
self.clientsocket.connect(('localhost', SERVER_PORT))
self.clientsocket.settimeout(None)

thread = threading.Thread(target=self.stop_client)
thread.start()
except Exception as exc:
print(exc.args)

# The stopClient() is called right with the startClient() but does not stop
# and essentially is a wait thread listening and is triggered by either a
# connection or timeout. Based on condition triggered by user, reconstruction
# completion or errors the end goal is to close the socket connection which
# would let the CLI exit. I could break it down to 2 parts but the idea was to
# keep the clientsocket.close() call within one method to make it easier to follow.
def stop_client(self):
try:
time.sleep(2)
while True:
time.sleep(1)
buf = ""
try:
buf = self.clientsocket.recv(1024)
except:
pass
if len(buf) > 0:
if b"\n" in buf:
dataList = buf.split(b"\n")
for data in dataList:
if len(data)>0:
decoded_string = data.decode()
json_str = str(decoded_string)
json_obj = json.loads(json_str)
u_idx = json_obj["uID"]
job_idx = str(json_obj["jID"])
cmd = json_obj["command"]
if cmd == "clientRelease":
if self.has_submitted_job(u_idx, job_idx):
self.clientsocket.close()
break
if cmd == "cancel":
if self.has_submitted_job(u_idx, job_idx):
try:
job = self.uIDsjobIDs[u_idx][job_idx]
job.cancel()
except Exception as exc:
pass # possibility of throwing an exception based on diff. OS
forDeletions = []
for uID in self.uIDsjobIDs.keys():
for jID in self.uIDsjobIDs[uID].keys():
job = self.uIDsjobIDs[uID][jID]
if job.done():
forDeletions.append((uID, jID))
for idx in range(len(forDeletions)):
del self.uIDsjobIDs[forDeletions[idx][0]][forDeletions[idx][1]]
forDeletions = []
for uID in self.uIDsjobIDs.keys():
if len(self.uIDsjobIDs[uID].keys()) == 0:
forDeletions.append(uID)
for idx in range(len(forDeletions)):
del self.uIDsjobIDs[forDeletions[idx]]
if len(self.uIDsjobIDs.keys()) == 0:
self.clientsocket.close()
break
except Exception as exc:
self.clientsocket.close()
print(exc.args)

def check_all_ExpJobs_completion(self, uID):
if uID in SERVER_uIDsjobIDs.keys():
for jobEntry in SERVER_uIDsjobIDs[uID].keys():
job:submitit.Job = SERVER_uIDsjobIDs[uID][jobEntry]["job"]
jobBool = SERVER_uIDsjobIDs[uID][jobEntry]["bool"]
if job is not None and job.done() == False:
return False
if jobBool == False:
return False
return True

def put_Job_completion_in_list(self, job_bool, uID: str, jID: str, mode="client"):
if uID in SERVER_uIDsjobIDs.keys():
if jID in SERVER_uIDsjobIDs[uID].keys():
SERVER_uIDsjobIDs[uID][jID]["bool"] = job_bool

def add_data(self, data):
self.DATA_QUEUE.append(data)

def send_data_thread(self):
thread = threading.Thread(target=self.send_data)
thread.start()

def send_data(self):
data = "".join(self.DATA_QUEUE)
self.clientsocket.send(data.encode())
self.DATA_QUEUE = []

def put_Job_in_list(self, job, uID: str, jID: str, well:str, log_folder_path:str="", mode="client"):
try:
well = str(well)
jID = str(jID)
if ".zarr" in well:
wells = well.split(".zarr")
well = wells[1].replace("\\","-").replace("/","-")[1:]
if mode == "client":
if uID not in self.uIDsjobIDs.keys():
self.uIDsjobIDs[uID] = {}
self.uIDsjobIDs[uID][jID] = job
else:
if jID not in self.uIDsjobIDs[uID].keys():
self.uIDsjobIDs[uID][jID] = job
json_obj = {uID:{"jID": str(jID), "pos": well, "log": log_folder_path}}
json_str = json.dumps(json_obj)+"\n"
self.add_data(json_str)
else:
# from server side jobs object entry is a None object
# this will be later checked as completion boolean for a ExpID which might
# have several Jobs associated with it
if uID not in SERVER_uIDsjobIDs.keys():
SERVER_uIDsjobIDs[uID] = {}
SERVER_uIDsjobIDs[uID][jID] = {}
SERVER_uIDsjobIDs[uID][jID]["job"] = job
SERVER_uIDsjobIDs[uID][jID]["bool"] = False
else:
SERVER_uIDsjobIDs[uID][jID] = {}
SERVER_uIDsjobIDs[uID][jID]["job"] = job
SERVER_uIDsjobIDs[uID][jID]["bool"] = False
except Exception as exc:
print(exc.args)

def has_submitted_job(self, uID: str, jID: str, mode="client")->bool:
jID = str(jID)
if mode == "client":
if uID in self.uIDsjobIDs.keys():
if jID in self.uIDsjobIDs[uID].keys():
return True
return False
else:
if uID in SERVER_uIDsjobIDs.keys():
if jID in SERVER_uIDsjobIDs[uID].keys():
return True
return False
6 changes: 6 additions & 0 deletions recOrder/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from recOrder.cli.apply_inverse_transfer_function import apply_inv_tf
from recOrder.cli.compute_transfer_function import compute_tf
from recOrder.cli.reconstruct import reconstruct
from recOrder.cli.gui_widget import gui


CONTEXT = {"help_option_names": ["-h", "--help"]}

Expand All @@ -21,3 +23,7 @@ def cli():
cli.add_command(reconstruct)
cli.add_command(compute_tf)
cli.add_command(apply_inv_tf)
cli.add_command(gui)

if __name__ == "__main__":
cli()
Loading

0 comments on commit e2eb24a

Please sign in to comment.