Skip to content

Commit

Permalink
Add more checking
Browse files Browse the repository at this point in the history
  • Loading branch information
probonopd committed Feb 15, 2025
1 parent 965dec3 commit abdb1c8
Showing 1 changed file with 68 additions and 20 deletions.
88 changes: 68 additions & 20 deletions fileops.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,67 +25,113 @@ def cancel(self):
self.progress_dialog.close()

def showError(self, message: str):
# Close the progress dialog if it's visible; if it hasn't been shown yet,
# this will be effectively a no-op.
self.show_window_timer.timeout.disconnect()
# Disconnect the timer and close the progress dialog if visible.
try:
self.show_window_timer.timeout.disconnect()
except:
pass

if self.progress_dialog.isVisible():
self.progress_dialog.close()

self.show_window_timer.timeout.disconnect()

# Create and execute an error message box.
err_box = QtWidgets.QMessageBox(self.parent)
err_box.setIcon(QtWidgets.QMessageBox.Icon.Critical)
err_box.setWindowTitle("Error")
err_box.setText(message)
err_box.setStandardButtons(QtWidgets.QMessageBox.StandardButton.Ok)
err_box.exec()


def askOverwrite(self, dest):
"""
Ask the user what to do if a destination file already exists.
Returns one of:
"this" - just overwrite this file,
"all" - overwrite all without asking again,
"none" - cancel the entire operation.
"""
msg_box = QtWidgets.QMessageBox(self.parent)
msg_box.setIcon(QtWidgets.QMessageBox.Icon.Question)
msg_box.setWindowTitle("Overwrite File?")
msg_box.setText(f"The file {dest!r} already exists.\nDo you want to overwrite it?")
this_button = msg_box.addButton("This", QtWidgets.QMessageBox.ButtonRole.AcceptRole)
all_button = msg_box.addButton("All", QtWidgets.QMessageBox.ButtonRole.AcceptRole)
none_button = msg_box.addButton("None", QtWidgets.QMessageBox.ButtonRole.RejectRole)
msg_box.exec()

clicked = msg_box.clickedButton()
if clicked == this_button:
return "this"
elif clicked == all_button:
return "all"
else:
return "none"

def run(self, operations, op_type):
# Check if there are any operations; if not, show an error immediately.
if not operations:
QtWidgets.QMessageBox.critical(self.parent, "Error", "No operations provided!")
return

# Pre-check the operations: verify sources and calculate total size.
# Pre-check the operations: verify sources, prevent self-overwrite, confirm overwrites, and calculate total size.
try:
total_size = 0
for src, _ in operations:
global_decision = None # Determines if "all" overwriting is confirmed.
valid_operations = []
for src, dest in operations:
# Prevent overwriting itself by comparing normalized absolute paths.
if os.path.abspath(src) == os.path.abspath(dest):
raise ValueError(f"Source and destination are the same: {src}")
return
# Verify the source exists.
if not os.path.exists(src):
raise FileNotFoundError(f"Source file does not exist: {src}")
return
# Ask for confirmation if the destination file exists.
if os.path.exists(dest):
if global_decision is None:
decision = self.askOverwrite(dest)
if decision == "none":
QtWidgets.QMessageBox.information(self.parent, "Operation Cancelled",
"File operation cancelled by user.")
return # Cancel the entire operation.
elif decision == "all":
global_decision = "all"
# If decision is "this" or global_decision is already set to "all", proceed.
valid_operations.append((src, dest))
total_size += os.path.getsize(src)
if total_size == 0:
raise ValueError("The total size of the files to process is zero.")
except Exception as e:
QtWidgets.QMessageBox.critical(self.parent, "Error", str(e))
# At this point the progress dialog has not been shown, so we simply return.
return

# At this point no errors, so show the progress dialog.
self.progress_dialog.setValue(0)
self.operation_finished = False
self.show_window_timer.start(1000)

self.op_thread = FileOperationThread(operations, op_type, total_size)
self.op_thread = FileOperationThread(valid_operations, op_type, total_size)
self.op_thread.progress.connect(self.progress_dialog.setValue)
# Connect error signal to our method; this ensures that the progress dialog closes
# before or as the error dialog appears.
self.op_thread.error.connect(self.showError)
self.op_thread.finished.connect(self.operation_finished_slot)
self.progress_dialog.canceled.connect(self.op_thread.cancel)
self.op_thread.start()

def show_progress_dialog(self):
# Only show the progress dialog if the operation hasn't finished yet
# and if the current progress is still less than 30%.
# Only show the dialog if the operation isn't finished yet and progress is still below 30%.
if not self.operation_finished and self.progress_dialog.value() < 30:
self.progress_dialog.show()

def operation_finished_slot(self):
self.operation_finished = True
self.progress_dialog.close()
self.show_window_timer.timeout.disconnect()
self.parent.refresh_view() if hasattr(self.parent, "refresh_view") else None
try:
self.show_window_timer.timeout.disconnect()
except:
pass
if hasattr(self.parent, "refresh_view"):
self.parent.refresh_view()


class FileOperationThread(QtCore.QThread):
Expand Down Expand Up @@ -131,7 +177,6 @@ def run(self):
return

if self.op_type == "move":
# Verify that the destination file exists and has the same size.
if os.path.exists(dest) and os.path.getsize(dest) == file_size:
try:
os.remove(src)
Expand All @@ -154,17 +199,20 @@ def cancel(self):
if __name__ == "__main__":
# Test run of a copy operation.
app = QtWidgets.QApplication([])
# For testing purposes, use a simple QWidget as the parent.
main_window = QtWidgets.QWidget()
# Define a dummy refresh_view method on the main window.

# Dummy refresh_view method for testing purposes.
def refresh_view():
print("Refreshed view.")
main_window.refresh_view = refresh_view

op = FileOperation(main_window)

# Test case: trying to copy a file to itself should trigger an error.
src = "test.txt"
dest = "test_copy.txt"
dest = "test.txt" # Using the same file for src and dest.
operations = [(src, dest)]

op.run(operations, "copy")
main_window.show()
app.exec()

0 comments on commit abdb1c8

Please sign in to comment.