From 4bb0e6e75a465264eaacb9754eb49973d9c1e384 Mon Sep 17 00:00:00 2001 From: ed Date: Sat, 20 Apr 2024 22:15:08 +0000 Subject: [PATCH] pipe: windows: make it safe with aggressive flushing --- copyparty/__main__.py | 2 +- copyparty/httpcli.py | 19 +++++++++++++++---- copyparty/up2k.py | 6 ++++-- copyparty/util.py | 11 +++++++---- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index b5ed7e2c..931e187f 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -856,7 +856,7 @@ def add_qr(ap, tty): def add_fs(ap): ap2 = ap.add_argument_group("filesystem options") - rm_re_def = "5/0.1" if ANYWIN else "0/0" + rm_re_def = "15/0.1" if ANYWIN else "0/0" ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)") ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)") ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)") diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 8b260891..05483d08 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -3174,7 +3174,7 @@ def tx_pipe( tiers = ["uncapped", "reduced speed", "one byte per sec"] while lower < upper and not broken: - with self.pipes.lk: + with self.u2mutex: job = self.pipes.get(req_path) if not job: x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path) @@ -3183,7 +3183,8 @@ def tx_pipe( self.pipes.set(req_path, job) if not job: - t = "pipe: upload has finished; yeeting remainder" + t = "pipe: OK, upload has finished; yeeting remainder" + self.log(t, 2) data_end = file_size break @@ -3196,6 +3197,16 @@ def tx_pipe( data_end += chunk_size t = "pipe: can stream %.2f MiB; requested range is %.2f to %.2f" self.log(t % (data_end / M, lower / M, upper / M), 6) + with self.u2mutex: + if data_end > self.u2fh.aps.get(ap_data, data_end): + try: + fhs = self.u2fh.cache[ap_data].all_fhs + for fh in fhs: + fh.flush() + self.u2fh.aps[ap_data] = data_end + self.log("pipe: flushed %d up2k-FDs" % (len(fhs),)) + except Exception as ex: + self.log("pipe: u2fh flush failed: %r" % (ex,)) if lower >= data_end: if data_end: @@ -3238,7 +3249,7 @@ def tx_pipe( raise Exception("got 0 bytes (EOF?)") except Exception as ex: self.log("pipe: read failed at %.2f MiB: %s" % (lower / M, ex), 3) - with self.pipes.lk: + with self.u2mutex: self.pipes.c.pop(req_path, None) spins += 1 if spins > 3: @@ -3265,7 +3276,7 @@ def tx_pipe( if lower < upper and not broken: with open(req_path, "rb") as f: - remains = sendfile_py(self.log, lower, upper, f, self.s, bufsz, slp) + remains = sendfile_py(self.log, lower, upper, f, self.s, wr_sz, wr_slp) spd = self._spd((upper - lower) - remains) if self.do_log: diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 55b8411b..a62ad77d 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -147,7 +147,7 @@ def __init__(self, hub: "SvcHub") -> None: self.volsize: dict["sqlite3.Cursor", int] = {} self.volstate: dict[str, str] = {} self.vol_act: dict[str, float] = {} - self.busy_aps: set[str] = set() + self.busy_aps: dict[str, int] = {} self.dupesched: dict[str, list[tuple[str, str, float]]] = {} self.snap_prev: dict[str, Optional[tuple[int, float]]] = {} @@ -2515,7 +2515,9 @@ def _add_cv_tab(self, cur: "sqlite3.Cursor") -> None: cur.connection.commit() - def handle_json(self, cj: dict[str, Any], busy_aps: set[str]) -> dict[str, Any]: + def handle_json( + self, cj: dict[str, Any], busy_aps: dict[str, int] + ) -> dict[str, Any]: # busy_aps is u2fh (always undefined if -j0) so this is safe self.busy_aps = busy_aps got_lock = False diff --git a/copyparty/util.py b/copyparty/util.py index 6086aa8e..115170d5 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -761,7 +761,6 @@ def cln(self) -> None: class CachedDict(object): def __init__(self, maxage: float) -> None: - self.lk = threading.Lock() self.c: dict[str, tuple[float, Any]] = {} self.maxage = maxage self.oldest = 0.0 @@ -795,10 +794,11 @@ class CE(object): def __init__(self, fh: typing.BinaryIO) -> None: self.ts: float = 0 self.fhs = [fh] + self.all_fhs = set([fh]) def __init__(self) -> None: self.cache: dict[str, FHC.CE] = {} - self.aps: set[str] = set() + self.aps: dict[str, int] = {} def close(self, path: str) -> None: try: @@ -810,7 +810,7 @@ def close(self, path: str) -> None: fh.close() del self.cache[path] - self.aps.remove(path) + del self.aps[path] def clean(self) -> None: if not self.cache: @@ -831,9 +831,12 @@ def pop(self, path: str) -> typing.BinaryIO: return self.cache[path].fhs.pop() def put(self, path: str, fh: typing.BinaryIO) -> None: - self.aps.add(path) + if path not in self.aps: + self.aps[path] = 0 + try: ce = self.cache[path] + ce.all_fhs.add(fh) ce.fhs.append(fh) except: ce = self.CE(fh)