diff --git a/fileops.py b/fileops.py index 3f0d66d..f5a5acc 100644 --- a/fileops.py +++ b/fileops.py @@ -25,14 +25,15 @@ def cancel(self): self.progress_dialog.close() def showError(self, message: str): - # Close the progress dialog if it's visible; if it hasn't been shown yet, - # this will be effectively a no-op. - self.show_window_timer.timeout.disconnect() + # Disconnect the timer and close the progress dialog if visible. + try: + self.show_window_timer.timeout.disconnect() + except: + pass + if self.progress_dialog.isVisible(): self.progress_dialog.close() - self.show_window_timer.timeout.disconnect() - # Create and execute an error message box. err_box = QtWidgets.QMessageBox(self.parent) err_box.setIcon(QtWidgets.QMessageBox.Icon.Critical) @@ -40,25 +41,69 @@ def showError(self, message: str): err_box.setText(message) err_box.setStandardButtons(QtWidgets.QMessageBox.StandardButton.Ok) err_box.exec() - + + def askOverwrite(self, dest): + """ + Ask the user what to do if a destination file already exists. + Returns one of: + "this" - just overwrite this file, + "all" - overwrite all without asking again, + "none" - cancel the entire operation. + """ + msg_box = QtWidgets.QMessageBox(self.parent) + msg_box.setIcon(QtWidgets.QMessageBox.Icon.Question) + msg_box.setWindowTitle("Overwrite File?") + msg_box.setText(f"The file {dest!r} already exists.\nDo you want to overwrite it?") + this_button = msg_box.addButton("This", QtWidgets.QMessageBox.ButtonRole.AcceptRole) + all_button = msg_box.addButton("All", QtWidgets.QMessageBox.ButtonRole.AcceptRole) + none_button = msg_box.addButton("None", QtWidgets.QMessageBox.ButtonRole.RejectRole) + msg_box.exec() + + clicked = msg_box.clickedButton() + if clicked == this_button: + return "this" + elif clicked == all_button: + return "all" + else: + return "none" + def run(self, operations, op_type): # Check if there are any operations; if not, show an error immediately. if not operations: QtWidgets.QMessageBox.critical(self.parent, "Error", "No operations provided!") return - # Pre-check the operations: verify sources and calculate total size. + # Pre-check the operations: verify sources, prevent self-overwrite, confirm overwrites, and calculate total size. try: total_size = 0 - for src, _ in operations: + global_decision = None # Determines if "all" overwriting is confirmed. + valid_operations = [] + for src, dest in operations: + # Prevent overwriting itself by comparing normalized absolute paths. + if os.path.abspath(src) == os.path.abspath(dest): + raise ValueError(f"Source and destination are the same: {src}") + return + # Verify the source exists. if not os.path.exists(src): raise FileNotFoundError(f"Source file does not exist: {src}") + return + # Ask for confirmation if the destination file exists. + if os.path.exists(dest): + if global_decision is None: + decision = self.askOverwrite(dest) + if decision == "none": + QtWidgets.QMessageBox.information(self.parent, "Operation Cancelled", + "File operation cancelled by user.") + return # Cancel the entire operation. + elif decision == "all": + global_decision = "all" + # If decision is "this" or global_decision is already set to "all", proceed. + valid_operations.append((src, dest)) total_size += os.path.getsize(src) if total_size == 0: raise ValueError("The total size of the files to process is zero.") except Exception as e: QtWidgets.QMessageBox.critical(self.parent, "Error", str(e)) - # At this point the progress dialog has not been shown, so we simply return. return # At this point no errors, so show the progress dialog. @@ -66,26 +111,27 @@ def run(self, operations, op_type): self.operation_finished = False self.show_window_timer.start(1000) - self.op_thread = FileOperationThread(operations, op_type, total_size) + self.op_thread = FileOperationThread(valid_operations, op_type, total_size) self.op_thread.progress.connect(self.progress_dialog.setValue) - # Connect error signal to our method; this ensures that the progress dialog closes - # before or as the error dialog appears. self.op_thread.error.connect(self.showError) self.op_thread.finished.connect(self.operation_finished_slot) self.progress_dialog.canceled.connect(self.op_thread.cancel) self.op_thread.start() def show_progress_dialog(self): - # Only show the progress dialog if the operation hasn't finished yet - # and if the current progress is still less than 30%. + # Only show the dialog if the operation isn't finished yet and progress is still below 30%. if not self.operation_finished and self.progress_dialog.value() < 30: self.progress_dialog.show() def operation_finished_slot(self): self.operation_finished = True self.progress_dialog.close() - self.show_window_timer.timeout.disconnect() - self.parent.refresh_view() if hasattr(self.parent, "refresh_view") else None + try: + self.show_window_timer.timeout.disconnect() + except: + pass + if hasattr(self.parent, "refresh_view"): + self.parent.refresh_view() class FileOperationThread(QtCore.QThread): @@ -131,7 +177,6 @@ def run(self): return if self.op_type == "move": - # Verify that the destination file exists and has the same size. if os.path.exists(dest) and os.path.getsize(dest) == file_size: try: os.remove(src) @@ -154,17 +199,20 @@ def cancel(self): if __name__ == "__main__": # Test run of a copy operation. app = QtWidgets.QApplication([]) - # For testing purposes, use a simple QWidget as the parent. main_window = QtWidgets.QWidget() - # Define a dummy refresh_view method on the main window. + + # Dummy refresh_view method for testing purposes. def refresh_view(): print("Refreshed view.") main_window.refresh_view = refresh_view op = FileOperation(main_window) + + # Test case: trying to copy a file to itself should trigger an error. src = "test.txt" - dest = "test_copy.txt" + dest = "test.txt" # Using the same file for src and dest. operations = [(src, dest)] + op.run(operations, "copy") main_window.show() app.exec()