Skip to content

Commit

Permalink
pipe: windows: make it safe with aggressive flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
9001 committed Apr 20, 2024
1 parent 2f7f9de commit 4bb0e6e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
2 changes: 1 addition & 1 deletion copyparty/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ def add_qr(ap, tty):

def add_fs(ap):
ap2 = ap.add_argument_group("filesystem options")
rm_re_def = "5/0.1" if ANYWIN else "0/0"
rm_re_def = "15/0.1" if ANYWIN else "0/0"
ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)")
ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)")
ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)")
Expand Down
19 changes: 15 additions & 4 deletions copyparty/httpcli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3174,7 +3174,7 @@ def tx_pipe(
tiers = ["uncapped", "reduced speed", "one byte per sec"]

while lower < upper and not broken:
with self.pipes.lk:
with self.u2mutex:
job = self.pipes.get(req_path)
if not job:
x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path)
Expand All @@ -3183,7 +3183,8 @@ def tx_pipe(
self.pipes.set(req_path, job)

if not job:
t = "pipe: upload has finished; yeeting remainder"
t = "pipe: OK, upload has finished; yeeting remainder"
self.log(t, 2)
data_end = file_size
break

Expand All @@ -3196,6 +3197,16 @@ def tx_pipe(
data_end += chunk_size
t = "pipe: can stream %.2f MiB; requested range is %.2f to %.2f"
self.log(t % (data_end / M, lower / M, upper / M), 6)
with self.u2mutex:
if data_end > self.u2fh.aps.get(ap_data, data_end):
try:
fhs = self.u2fh.cache[ap_data].all_fhs
for fh in fhs:
fh.flush()
self.u2fh.aps[ap_data] = data_end
self.log("pipe: flushed %d up2k-FDs" % (len(fhs),))
except Exception as ex:
self.log("pipe: u2fh flush failed: %r" % (ex,))

if lower >= data_end:
if data_end:
Expand Down Expand Up @@ -3238,7 +3249,7 @@ def tx_pipe(
raise Exception("got 0 bytes (EOF?)")
except Exception as ex:
self.log("pipe: read failed at %.2f MiB: %s" % (lower / M, ex), 3)
with self.pipes.lk:
with self.u2mutex:
self.pipes.c.pop(req_path, None)
spins += 1
if spins > 3:
Expand All @@ -3265,7 +3276,7 @@ def tx_pipe(

if lower < upper and not broken:
with open(req_path, "rb") as f:
remains = sendfile_py(self.log, lower, upper, f, self.s, bufsz, slp)
remains = sendfile_py(self.log, lower, upper, f, self.s, wr_sz, wr_slp)

spd = self._spd((upper - lower) - remains)
if self.do_log:
Expand Down
6 changes: 4 additions & 2 deletions copyparty/up2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __init__(self, hub: "SvcHub") -> None:
self.volsize: dict["sqlite3.Cursor", int] = {}
self.volstate: dict[str, str] = {}
self.vol_act: dict[str, float] = {}
self.busy_aps: set[str] = set()
self.busy_aps: dict[str, int] = {}
self.dupesched: dict[str, list[tuple[str, str, float]]] = {}
self.snap_prev: dict[str, Optional[tuple[int, float]]] = {}

Expand Down Expand Up @@ -2515,7 +2515,9 @@ def _add_cv_tab(self, cur: "sqlite3.Cursor") -> None:

cur.connection.commit()

def handle_json(self, cj: dict[str, Any], busy_aps: set[str]) -> dict[str, Any]:
def handle_json(
self, cj: dict[str, Any], busy_aps: dict[str, int]
) -> dict[str, Any]:
# busy_aps is u2fh (always undefined if -j0) so this is safe
self.busy_aps = busy_aps
got_lock = False
Expand Down
11 changes: 7 additions & 4 deletions copyparty/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,6 @@ def cln(self) -> None:

class CachedDict(object):
def __init__(self, maxage: float) -> None:
self.lk = threading.Lock()
self.c: dict[str, tuple[float, Any]] = {}
self.maxage = maxage
self.oldest = 0.0
Expand Down Expand Up @@ -795,10 +794,11 @@ class CE(object):
def __init__(self, fh: typing.BinaryIO) -> None:
self.ts: float = 0
self.fhs = [fh]
self.all_fhs = set([fh])

def __init__(self) -> None:
self.cache: dict[str, FHC.CE] = {}
self.aps: set[str] = set()
self.aps: dict[str, int] = {}

def close(self, path: str) -> None:
try:
Expand All @@ -810,7 +810,7 @@ def close(self, path: str) -> None:
fh.close()

del self.cache[path]
self.aps.remove(path)
del self.aps[path]

def clean(self) -> None:
if not self.cache:
Expand All @@ -831,9 +831,12 @@ def pop(self, path: str) -> typing.BinaryIO:
return self.cache[path].fhs.pop()

def put(self, path: str, fh: typing.BinaryIO) -> None:
self.aps.add(path)
if path not in self.aps:
self.aps[path] = 0

try:
ce = self.cache[path]
ce.all_fhs.add(fh)
ce.fhs.append(fh)
except:
ce = self.CE(fh)
Expand Down

0 comments on commit 4bb0e6e

Please sign in to comment.