diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 6f7349b6..4f02dd69 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -858,6 +858,7 @@ def add_fs(ap): ap2 = ap.add_argument_group("filesystem options") rm_re_def = "5/0.1" if ANYWIN else "0/0" ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)") + ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)") ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)") diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 7e2cbcde..bd6f6cab 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -1764,13 +1764,14 @@ def _reload(self) -> None: if k in vol.flags: vol.flags[k] = float(vol.flags[k]) - try: - zs1, zs2 = vol.flags["rm_retry"].split("/") - vol.flags["rm_re_t"] = float(zs1) - vol.flags["rm_re_r"] = float(zs2) - except: - t = 'volume "/%s" has invalid rm_retry [%s]' - raise Exception(t % (vol.vpath, vol.flags.get("rm_retry"))) + for k in ("mv_re", "rm_re"): + try: + zs1, zs2 = vol.flags[k + "try"].split("/") + vol.flags[k + "_t"] = float(zs1) + vol.flags[k + "_r"] = float(zs2) + except: + t = 'volume "/%s" has invalid %stry [%s]' + raise Exception(t % (vol.vpath, k, vol.flags.get(k + "try"))) for k1, k2 in IMPLICATIONS: if k1 in vol.flags: diff --git a/copyparty/cert.py b/copyparty/cert.py index 6b40dea4..8aff3609 100644 --- a/copyparty/cert.py +++ b/copyparty/cert.py @@ -6,7 +6,8 @@ import shutil import time -from .util import Netdev, runcmd +from .__init__ import ANYWIN +from .util import Netdev, runcmd, wrename, wunlink HAVE_CFSSL = True @@ -14,6 +15,12 @@ from .util import RootLogger +if ANYWIN: + VF = {"mv_re_t": 5, "rm_re_t": 5, "mv_re_r": 0.1, "rm_re_r": 0.1} +else: + VF = {"mv_re_t": 0, "rm_re_t": 0} + + def ensure_cert(log: "RootLogger", args) -> None: """ the default cert (and the entire TLS support) is only here to enable the @@ -105,8 +112,12 @@ def _gen_ca(log: "RootLogger", args): raise Exception("failed to translate ca-cert: {}, {}".format(rc, se), 3) bname = os.path.join(args.crt_dir, "ca") - os.rename(bname + "-key.pem", bname + ".key") - os.unlink(bname + ".csr") + try: + wunlink(log, bname + ".key", VF) + except: + pass + wrename(log, bname + "-key.pem", bname + ".key", VF) + wunlink(log, bname + ".csr", VF) log("cert", "new ca OK", 2) @@ -185,11 +196,11 @@ def _gen_srv(log: "RootLogger", args, netdevs: dict[str, Netdev]): bname = os.path.join(args.crt_dir, "srv") try: - os.unlink(bname + ".key") + wunlink(log, bname + ".key", VF) except: pass - os.rename(bname + "-key.pem", bname + ".key") - os.unlink(bname + ".csr") + wrename(log, bname + "-key.pem", bname + ".key", VF) + wunlink(log, bname + ".csr", VF) with open(os.path.join(args.crt_dir, "ca.pem"), "rb") as f: ca = f.read() diff --git a/copyparty/cfg.py b/copyparty/cfg.py index 9781979f..a878b8df 100644 --- a/copyparty/cfg.py +++ b/copyparty/cfg.py @@ -63,6 +63,7 @@ def vf_vmap() -> dict[str, str]: "lg_sbf", "md_sbf", "nrand", + "mv_retry", "rm_retry", "sort", "unlist", @@ -214,6 +215,7 @@ def vf_cmap() -> dict[str, str]: "dots": "allow all users with read-access to\nenable the option to show dotfiles in listings", "fk=8": 'generates per-file accesskeys,\nwhich are then required at the "g" permission;\nkeys are invalidated if filesize or inode changes', "fka=8": 'generates slightly weaker per-file accesskeys,\nwhich are then required at the "g" permission;\nnot affected by filesize or inode numbers', + "mv_retry": "ms-windows: timeout for renaming busy files", "rm_retry": "ms-windows: timeout for deleting busy files", "davauth": "ask webdav clients to login for all folders", "davrt": "show lastmod time of symlink destination, not the link itself\n(note: this option is always enabled for recursive listings)", diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 55467694..7dc88748 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -89,6 +89,7 @@ vjoin, vol_san, vsplit, + wrename, wunlink, yieldfile, ) @@ -1804,7 +1805,7 @@ def dump_to_file(self, is_put: bool) -> tuple[int, str, str, int, str, str]: f, fn = zfw["orz"] path2 = os.path.join(fdir, fn2) - atomic_move(path, path2) + atomic_move(self.log, path, path2, vfs.flags) fn = fn2 path = path2 @@ -1885,7 +1886,9 @@ def handle_stash(self, is_put: bool) -> bool: self.reply(t.encode("utf-8"), 201, headers=h) return True - def bakflip(self, f: typing.BinaryIO, ofs: int, sz: int, sha: str) -> None: + def bakflip( + self, f: typing.BinaryIO, ofs: int, sz: int, sha: str, flags: dict[str, Any] + ) -> None: if not self.args.bak_flips or self.args.nw: return @@ -1913,7 +1916,7 @@ def bakflip(self, f: typing.BinaryIO, ofs: int, sz: int, sha: str) -> None: if nrem: self.log("bakflip truncated; {} remains".format(nrem), 1) - atomic_move(fp, fp + ".trunc") + atomic_move(self.log, fp, fp + ".trunc", flags) else: self.log("bakflip ok", 2) @@ -2179,7 +2182,7 @@ def handle_post_binary(self) -> bool: if sha_b64 != chash: try: - self.bakflip(f, cstart[0], post_sz, sha_b64) + self.bakflip(f, cstart[0], post_sz, sha_b64, vfs.flags) except: self.log("bakflip failed: " + min_ex()) @@ -2531,7 +2534,7 @@ def handle_plain_upload( raise if not nullwrite: - atomic_move(tabspath, abspath) + atomic_move(self.log, tabspath, abspath, vfs.flags) tabspath = "" @@ -2771,7 +2774,7 @@ def handle_text_upload(self) -> bool: hidedir(dp) except: pass - bos.rename(fp, os.path.join(mdir, ".hist", mfile2)) + wrename(self.log, fp, os.path.join(mdir, ".hist", mfile2), vfs.flags) assert self.parser.gen p_field, _, p_data = next(self.parser.gen) diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 89b646fa..908985fb 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -550,6 +550,13 @@ def _process_config(self) -> bool: except: raise Exception("invalid --rm-retry [%s]" % (self.args.rm_retry,)) + try: + zf1, zf2 = self.args.mv_retry.split("/") + self.args.mv_re_t = float(zf1) + self.args.mv_re_r = float(zf2) + except: + raise Exception("invalid --mv-retry [%s]" % (self.args.mv_retry,)) + return True def _ipa2re(self, txt) -> Optional[re.Pattern]: diff --git a/copyparty/th_srv.py b/copyparty/th_srv.py index cd5d1a99..cb673983 100644 --- a/copyparty/th_srv.py +++ b/copyparty/th_srv.py @@ -28,6 +28,7 @@ runcmd, statdir, vsplit, + wrename, wunlink, ) @@ -346,7 +347,7 @@ def worker(self) -> None: pass try: - bos.rename(ttpath, tpath) + wrename(self.log, ttpath, tpath, vn.flags) except: pass diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 39f386a0..a048c9da 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -91,6 +91,9 @@ HINT_HISTPATH = "you could try moving the database to another location (preferably an SSD or NVME drive) using either the --hist argument (global option for all volumes), or the hist volflag (just for this volume)" +VF_CAREFUL = {"mv_re_t": 5, "rm_re_t": 5, "mv_re_r": 0.1, "rm_re_r": 0.1} + + class Dbw(object): def __init__(self, c: "sqlite3.Cursor", n: int, t: float) -> None: self.c = c @@ -869,7 +872,7 @@ def register_vpath( ft = "\033[0;32m{}{:.0}" ff = "\033[0;35m{}{:.0}" fv = "\033[0;36m{}:\033[90m{}" - fx = set(("html_head", "rm_re_t", "rm_re_r")) + fx = set(("html_head", "rm_re_t", "rm_re_r", "mv_re_t", "mv_re_r")) fd = vf_bmap() fd.update(vf_cmap()) fd.update(vf_vmap()) @@ -3044,12 +3047,11 @@ def _finish_upload(self, ptop: str, wark: str) -> None: t = "finish_upload {} with remaining chunks {}" raise Pebkac(500, t.format(wark, job["need"])) - # self.log("--- " + wark + " " + dst + " finish_upload atomic " + dst, 4) - atomic_move(src, dst) - upt = job.get("at") or time.time() vflags = self.flags[ptop] + atomic_move(self.log, src, dst, vflags) + times = (int(time.time()), int(job["lmod"])) self.log( "no more chunks, setting times {} ({}) on {}".format( @@ -3653,7 +3655,7 @@ def _mv_file( self._symlink(dlink, dabs, dvn.flags, lmod=ftime) wunlink(self.log, sabs, svn.flags) else: - atomic_move(sabs, dabs) + atomic_move(self.log, sabs, dabs, svn.flags) except OSError as ex: if ex.errno != errno.EXDEV: @@ -3830,8 +3832,7 @@ def _relink(self, wark: str, sptop: str, srem: str, dabs: str) -> int: self.log("linkswap [{}] and [{}]".format(sabs, slabs)) mt = bos.path.getmtime(slabs, False) flags = self.flags.get(ptop) or {} - wunlink(self.log, slabs, flags) - bos.rename(sabs, slabs) + atomic_move(self.log, sabs, slabs, flags) bos.utime(slabs, (int(time.time()), int(mt)), False) self._symlink(slabs, sabs, flags, False) full[slabs] = (ptop, rem) @@ -4142,7 +4143,7 @@ def _snap_reg(self, ptop: str, reg: dict[str, dict[str, Any]]) -> None: with gzip.GzipFile(path2, "wb") as f: f.write(j) - atomic_move(path2, path) + atomic_move(self.log, path2, path, VF_CAREFUL) self.log("snap: {} |{}|".format(path, len(reg.keys()))) self.snap_prev[ptop] = etag diff --git a/copyparty/util.py b/copyparty/util.py index dcb91017..f7211d71 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -2125,26 +2125,29 @@ def lsof(log: "NamedLogger", abspath: str) -> None: log("lsof failed; " + min_ex(), 3) -def atomic_move(usrc: str, udst: str) -> None: - src = fsenc(usrc) - dst = fsenc(udst) - if not PY2: - os.replace(src, dst) +def _fs_mvrm( + log: "NamedLogger", src: str, dst: str, atomic: bool, flags: dict[str, Any] +) -> bool: + bsrc = fsenc(src) + bdst = fsenc(dst) + if atomic: + k = "mv_re_" + act = "atomic-rename" + osfun = os.replace + args = [bsrc, bdst] + elif dst: + k = "mv_re_" + act = "rename" + osfun = os.rename + args = [bsrc, bdst] else: - if os.path.exists(dst): - os.unlink(dst) - - os.rename(src, dst) - - -def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool: - maxtime = flags.get("rm_re_t", 0.0) - bpath = fsenc(abspath) - if not maxtime: - os.unlink(bpath) - return True + k = "rm_re_" + act = "delete" + osfun = os.unlink + args = [bsrc] - chill = flags.get("rm_re_r", 0.0) + maxtime = flags.get(k + "t", 0.0) + chill = flags.get(k + "r", 0.0) if chill < 0.001: chill = 0.1 @@ -2152,14 +2155,19 @@ def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool: t0 = now = time.time() for attempt in range(90210): try: - if ino and os.stat(bpath).st_ino != ino: - log("inode changed; aborting delete") + if ino and os.stat(bsrc).st_ino != ino: + t = "src inode changed; aborting %s %s" + log(t % (act, src), 1) return False - os.unlink(bpath) + if (dst and not atomic) and os.path.exists(bdst): + t = "something appeared at dst; aborting rename [%s] ==> [%s]" + log(t % (src, dst), 1) + return False + osfun(*args) if attempt: now = time.time() - t = "deleted in %.2f sec, attempt %d" - log(t % (now - t0, attempt + 1)) + t = "%sd in %.2f sec, attempt %d: %s" + log(t % (act, now - t0, attempt + 1, src)) return True except OSError as ex: now = time.time() @@ -2169,15 +2177,45 @@ def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool: raise if not attempt: if not PY2: - ino = os.stat(bpath).st_ino - t = "delete failed (err.%d); retrying for %d sec: %s" - log(t % (ex.errno, maxtime + 0.99, abspath)) + ino = os.stat(bsrc).st_ino + t = "%s failed (err.%d); retrying for %d sec: [%s]" + log(t % (act, ex.errno, maxtime + 0.99, src)) time.sleep(chill) return False # makes pylance happy +def atomic_move(log: "NamedLogger", src: str, dst: str, flags: dict[str, Any]) -> None: + bsrc = fsenc(src) + bdst = fsenc(dst) + if PY2: + if os.path.exists(bdst): + _fs_mvrm(log, dst, "", False, flags) # unlink + + _fs_mvrm(log, src, dst, False, flags) # rename + elif flags.get("mv_re_t"): + _fs_mvrm(log, src, dst, True, flags) + else: + os.replace(bsrc, bdst) + + +def wrename(log: "NamedLogger", src: str, dst: str, flags: dict[str, Any]) -> bool: + if not flags.get("mv_re_t"): + os.rename(fsenc(src), fsenc(dst)) + return True + + return _fs_mvrm(log, src, dst, False, flags) + + +def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool: + if not flags.get("rm_re_t"): + os.unlink(fsenc(abspath)) + return True + + return _fs_mvrm(log, abspath, "", False, flags) + + def get_df(abspath: str) -> tuple[Optional[int], Optional[int]]: try: # some fuses misbehave diff --git a/tests/util.py b/tests/util.py index 23396062..5b664ee0 100644 --- a/tests/util.py +++ b/tests/util.py @@ -155,6 +155,7 @@ def __init__(self, a=None, v=None, c=None, **ka0): mte={"a": True}, mth={}, mtp=[], + mv_retry="0/0", rm_retry="0/0", s_rd_sz=256 * 1024, s_wr_sz=256 * 1024,