Skip to content

Commit

Permalink
feat(imagetool.manager): implement threaded file loading
Browse files Browse the repository at this point in the history
Data are now loaded in the background.
  • Loading branch information
kmnhan committed Jan 4, 2025
1 parent 00ab074 commit feeb06b
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 46 deletions.
173 changes: 173 additions & 0 deletions src/erlab/interactive/imagetool/manager/_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Helper classes for loading multiple files sequentially in a separate thread."""

from __future__ import annotations

__all__ = ["_MultiFileHandler"]

import collections
import logging
import pathlib
import weakref
from typing import TYPE_CHECKING, Any, cast

from qtpy import QtCore, QtWidgets

import erlab

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from collections.abc import Callable

import xarray as xr

from erlab.interactive.imagetool.manager import ImageToolManager


class _DataLoaderSignals(QtCore.QObject):
sigLoaded = QtCore.Signal(pathlib.Path, list)
sigFailed = QtCore.Signal(pathlib.Path, str)


class _DataLoader(QtCore.QRunnable):
def __init__(self, file_path: pathlib.Path, func: Callable, kwargs: dict[str, Any]):
super().__init__()
self.signals: _DataLoaderSignals = _DataLoaderSignals()

self._file_path = file_path
self._func = func
self._kwargs = kwargs

@QtCore.Slot()
def run(self) -> None:
try:
data_list: list[xr.DataArray] = (
erlab.interactive.imagetool.core._parse_input(
self._func(self._file_path, **self._kwargs)
)
)
except Exception as e:
logger.exception("Error loading data from %s", self._file_path)
self.signals.sigFailed.emit(self._file_path, f"{type(e).__name__}: {e}")
else:
self.signals.sigLoaded.emit(self._file_path, data_list)


class _MultiFileHandler(QtCore.QObject):
"""Manage the loading of multiple files in a separate thread.
Parameters
----------
manager
The manager instance.
file_list
List of file paths to be loaded.
func
The function to be called for loading each file.
kwargs
Additional keyword arguments to be passed to ``func``.
Attributes
----------
loaded
List of successfully loaded files.
failed
List of files that failed to load.
Signals
-------
sigFinished()
Emitted when the loading process has finished. The signal is emitted even if the
loading process was aborted due to an error.
"""

sigFinished = QtCore.Signal() #: :meta private:

def __init__(
self,
manager: ImageToolManager,
file_list: list[pathlib.Path],
func: Callable,
kwargs: dict[str, Any],
):
super().__init__(manager)

self._manager = weakref.ref(manager)
self._queue: collections.deque[pathlib.Path] = collections.deque(file_list)
self._func = func
self._kwargs = kwargs

self.loaded: list[pathlib.Path] = []
self.failed: list[pathlib.Path] = []

@property
def _threadpool(self) -> QtCore.QThreadPool:
return cast(QtCore.QThreadPool, QtCore.QThreadPool.globalInstance())

@property
def manager(self) -> ImageToolManager:
"""Access the parent manager instance."""
_manager = self._manager()
if _manager is None:
raise LookupError("Parent was destroyed")
return _manager

@property
def queued(self) -> list[pathlib.Path]:
"""List of files that are yet to be loaded."""
return list(self._queue)

def start(self) -> None:
"""Initiate the loading process.
This method should be only called once.
"""
self._load_next()

def _load_next(self) -> None:
"""Load the next file in the queue."""
if len(self._queue) == 0:
self.sigFinished.emit()
return

file_path = self._queue.popleft()

self.manager._status_bar.showMessage(f"Loading {file_path.name}...")

loader = _DataLoader(file_path, self._func, self._kwargs)
loader.signals.sigLoaded.connect(self._on_loaded)
loader.signals.sigFailed.connect(self._on_failed)
self._threadpool.start(loader)

@QtCore.Slot(pathlib.Path, list)
def _on_loaded(
self, file_path: pathlib.Path, data_list: list[xr.DataArray]
) -> None:
self.manager._status_bar.showMessage("")
self.manager._data_recv(data_list, kwargs={"file_path": file_path})
self.loaded.append(file_path)
self._load_next()

@QtCore.Slot(pathlib.Path, str)
def _on_failed(self, file_path: pathlib.Path, exc_str: str) -> None:
self.manager._status_bar.showMessage("")
self.failed.append(file_path)

msg_box = QtWidgets.QMessageBox(self.manager)
msg_box.setIcon(QtWidgets.QMessageBox.Icon.Warning)
msg_box.setText(f"Failed to load {file_path.name}")
msg_box.setInformativeText(
"Do you want to skip this file and continue loading?"
)
msg_box.setStandardButtons(
QtWidgets.QMessageBox.StandardButton.Abort
| QtWidgets.QMessageBox.StandardButton.Yes
)
msg_box.setDefaultButton(QtWidgets.QMessageBox.StandardButton.Yes)
msg_box.setDetailedText(exc_str)
match msg_box.exec():
case QtWidgets.QMessageBox.StandardButton.Yes:
self._load_next()
case QtWidgets.QMessageBox.StandardButton.Abort:
self.sigFinished.emit()
87 changes: 41 additions & 46 deletions src/erlab/interactive/imagetool/manager/_mainwindow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_RenameDialog,
_StoreDialog,
)
from erlab.interactive.imagetool.manager._io import _MultiFileHandler
from erlab.interactive.imagetool.manager._modelview import _ImageToolWrapperListView
from erlab.interactive.imagetool.manager._server import _ManagerServer, show_in_manager
from erlab.interactive.imagetool.manager._wrapper import _ImageToolWrapper
Expand Down Expand Up @@ -337,6 +338,12 @@ def __init__(self: ImageToolManager) -> None:
self._kb_filter = erlab.interactive.utils.KeyboardEventFilter(self)
self.text_box.installEventFilter(self._kb_filter)

# File handlers for multithreaded file loading
self._file_handlers: set[_MultiFileHandler] = set()

# Initialize status bar
self._status_bar.showMessage("")

@property
def cache_dir(self) -> str:
"""Name of the cache directory where archived data are stored."""
Expand All @@ -352,6 +359,10 @@ def next_idx(self) -> int:
"""Index for the next ImageTool window."""
return max(self._tool_wrappers.keys(), default=-1) + 1

@property
def _status_bar(self) -> QtWidgets.QStatusBar:
return cast(QtWidgets.QStatusBar, self.statusBar())

@QtCore.Slot()
def about(self) -> None:
"""Show the about dialog."""
Expand Down Expand Up @@ -709,12 +720,15 @@ def _to_datatree(self, close: bool = False) -> xr.DataTree:

def _from_datatree(self, tree: xr.DataTree) -> None:
"""Restore the state of the manager from a DataTree object."""
if not self._is_datatree_workspace(tree):
raise ValueError("Not a valid workspace file")
for node in cast(ValuesView[xr.DataTree], (tree.values())):
self.add_tool(
ImageTool.from_dataset(node.to_dataset(inherit=False), _in_manager=True)
)
with erlab.interactive.utils.wait_dialog(self, "Loading workspace..."):
if not self._is_datatree_workspace(tree):
raise ValueError("Not a valid workspace file")
for node in cast(ValuesView[xr.DataTree], (tree.values())):
self.add_tool(
ImageTool.from_dataset(
node.to_dataset(inherit=False), _in_manager=True
)
)

def _is_datatree_workspace(self, tree: xr.DataTree) -> bool:
"""Check if the given DataTree object is a valid workspace file."""
Expand Down Expand Up @@ -771,8 +785,7 @@ def load(self, *, native: bool = True) -> None:
fname = dialog.selectedFiles()[0]
self._recent_directory = os.path.dirname(fname)
try:
with erlab.interactive.utils.wait_dialog(self, "Loading workspace..."):
self._from_datatree(xr.open_datatree(fname, engine="h5netcdf"))
self._from_datatree(xr.open_datatree(fname, engine="h5netcdf"))
except Exception as e:
logger.exception("Error while loading workspace")
QtWidgets.QMessageBox.critical(
Expand Down Expand Up @@ -929,11 +942,9 @@ def dropEvent(self, event: QtGui.QDropEvent | None) -> None:
)
return

msg = f"Loading {'file' if len(file_paths) == 1 else 'files'}..."
with erlab.interactive.utils.wait_dialog(self, msg):
self.open_multiple_files(
file_paths, try_workspace=extensions == {".h5"}
)
self.open_multiple_files(
file_paths, try_workspace=extensions == {".h5"}
)

def _show_loaded_info(
self,
Expand Down Expand Up @@ -966,6 +977,10 @@ def _show_loaded_info(
) # Remove duplicate entries

n_done, n_fail = len(loaded), len(failed)

status_msg = f"Loaded {n_done} {'file' if n_done == 1 else 'files'}"
self._status_bar.showMessage(status_msg, 5000)

if n_fail == 0:
return

Expand Down Expand Up @@ -1077,40 +1092,20 @@ def _add_from_multiple_files(
kwargs: dict[str, Any],
retry_callback: Callable,
) -> None:
for p in list(queued):
queued.remove(p)
try:
data_list = erlab.interactive.imagetool.core._parse_input(
func(p, **kwargs)
)
except Exception as e:
logger.exception("Error loading data from %s", p)
failed.append(p)
msg_box = QtWidgets.QMessageBox(self)
msg_box.setIcon(QtWidgets.QMessageBox.Icon.Warning)
msg_box.setText(f"Failed to load {p.name}")
msg_box.setInformativeText(
"Do you want to skip this file and continue loading?"
)
msg_box.setStandardButtons(
QtWidgets.QMessageBox.StandardButton.Abort
| QtWidgets.QMessageBox.StandardButton.Yes
)
msg_box.setDefaultButton(QtWidgets.QMessageBox.StandardButton.Yes)
msg_box.setDetailedText(f"{type(e).__name__}: {e}")
match msg_box.exec():
case QtWidgets.QMessageBox.StandardButton.Yes:
continue
case QtWidgets.QMessageBox.StandardButton.Abort:
break
else:
flags = self._data_recv(data_list, kwargs={"file_path": p})
if not all(flags):
failed.append(p)
else:
loaded.append(p)
handler = _MultiFileHandler(self, queued, func, kwargs)
self._file_handlers.add(handler)

def _finished_callback() -> None:
self._show_loaded_info(
loaded + handler.loaded,
handler.queued,
failed + handler.failed,
retry_callback=retry_callback,
)
self._file_handlers.remove(handler)

self._show_loaded_info(loaded, queued, failed, retry_callback=retry_callback)
handler.sigFinished.connect(_finished_callback)
handler.start()

def add_widget(self, widget: QtWidgets.QWidget) -> None:
"""Save a reference to an additional window widget.
Expand Down

0 comments on commit feeb06b

Please sign in to comment.