Skip to content

Commit

Permalink
Version 0.5.14
Browse files Browse the repository at this point in the history
abcache.fs: Handle file read ends with EOF from HTTP server instead of `size >= end`
  • Loading branch information
mos9527 committed Jan 10, 2025
1 parent 8a273be commit d9c39f9
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 41 deletions.
7 changes: 4 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"version": "0.2.0",
"configurations": [

{
"name": "Python: SpineExtract test",
"type": "debugpy",
Expand Down Expand Up @@ -46,9 +47,9 @@
"--app-region",
"jp",
"--app-version",
"4.1.1",
"5.0.0",
"--app-appHash",
"41fd71f2-f715-bc10-5852-0a9d8542f760"
"746b8607-0e65-489d-b060-a8986ba11b47"
],
"justMyCode": true
},
Expand Down Expand Up @@ -188,7 +189,7 @@
"--download-dir",
"~/.sssekai/bundles/",
"--download-filter",
"thumbnail/costume"
".*/stage_decoration/0344.*"
],
"justMyCode": false
},
Expand Down
2 changes: 1 addition & 1 deletion sssekai/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__VERSION_MAJOR__ = 0
__VERSION_MINOR__ = 5
__VERSION_PATCH__ = 13
__VERSION_PATCH__ = 14

__version__ = "%s.%s.%s" % (__VERSION_MAJOR__, __VERSION_MINOR__, __VERSION_PATCH__)
119 changes: 83 additions & 36 deletions sssekai/abcache/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fsspec.archive import AbstractArchiveFileSystem
from requests import Response
from logging import getLogger
from sssekai.crypto.AssetBundle import decrypt_iter
from sssekai.crypto.AssetBundle import decrypt_iter, SEKAI_AB_MAGIC
from . import AbCache, AbCacheEntry

logger = getLogger("abcache.fs")
Expand All @@ -20,28 +20,50 @@ class UnidirectionalBlockCache(BaseCache):
name: str = "unidirectional_blockcache"

def __init__(
self, blocksize: int, fetcher: Callable[[int, int], bytes], size: int
self,
blocksize: int,
fetcher: Callable[[int, int], bytes],
size: int,
ignore_size: bool = False,
) -> None:
"""Create a unidirectional block cache.
Args:
blocksize (int): Block size in bytes.
fetcher (Callable[[int, int], bytes]): Fetcher function that takes start and end byte positions and returns the data.
size (int): Size of the file.
ignore_size (bool, optional): Don't truncate reads with `size` provided. Defaults to False.
"""
super().__init__(blocksize, fetcher, size)
self.nblocks = math.ceil(size / self.blocksize)
self.ignore_size = ignore_size
if not ignore_size:
self.nblocks = math.ceil(size / self.blocksize)
else:
self.nblocks = float("inf")
self.blocks = list()
self.eof = False

def __fetch_block(self, block_number):
def __fetch_block(self, block_number):
assert block_number < self.nblocks, "block out of range"
while len(self.blocks) - 1 < block_number:
logger.debug("Fetching block %d" % (len(self.blocks)))
while not self.eof and len(self.blocks) - 1 < block_number:
start = self.blocksize * len(self.blocks)
end = start + self.blocksize
block = self.fetcher(start, end)
self.blocks.append(block)
return self.blocks[block_number]
if block:
self.blocks.append(block)
else:
self.eof = True
if block_number < len(self.blocks):
return self.blocks[block_number]
return b"" # EOF behavior when ignore_size is True

def _fetch(self, start: int | None, stop: int | None) -> bytes:
if start is None:
start = 0
if stop is None:
stop = self.size
stop = min(stop, self.size) # XXX: why didn't fsspec handle this?
if not self.ignore_size:
stop = min(stop, self.size) # XXX: why didn't fsspec handle this?
if start >= self.size or start >= stop:
return b""
start_blk, start_pos = start // self.blocksize, start % self.blocksize
Expand All @@ -68,8 +90,11 @@ class AbCacheFile(AbstractBufferedFile):
- The fetched content is decrypted on the fly.
- Seeks are simulated by read-aheads (by UnidirectionalBlockCache). Meaning seek operations
will incur additional download (in-betweens will be cached as well).
- File sizes reported are *inaccurate* due to wrong values sent by the server.
Read until EOF otherwise you will miss data.
"""
DEFAULT_BLOCK_SIZE = 65536 # 64KB

DEFAULT_BLOCK_SIZE = 65536 # 64KB
entry: AbCacheEntry

@property
Expand All @@ -78,10 +103,17 @@ def session(self) -> AbCache:

@property
def entry(self) -> AbCacheEntry:
entry = self.session.get_entry_by_bundle_name(self.path.strip('/'))
entry = self.session.get_entry_by_bundle_name(self.path.strip("/"))
assert entry is not None, "entry not found"
return entry

def read(self, length=-1):
if length < 0:
length = float("inf")
out = self.cache._fetch(self.loc, self.loc + length)
self.loc += len(out)
return out

def __init__(self, fs, bundle: str, block_size=None):
self.fs, self.path = fs, bundle
self.fetch_loc = 0
Expand All @@ -92,6 +124,9 @@ def __init__(self, fs, bundle: str, block_size=None):
mode="rb",
cache_type="unidirectional_blockcache",
size=self.entry.fileSize,
cache_options={"ignore_size": True},
# Sadly entry size could be *extremely* inaccurate.
# We have to ignore it and fetch until EOF.
)

@cached_property
Expand All @@ -104,7 +139,8 @@ def __resp(self) -> Response:
def __fetch(self):
def __innner():
for block in decrypt_iter(
lambda nbytes: next(self.__resp.iter_content(nbytes), b''), self.blocksize
lambda nbytes: next(self.__resp.iter_content(nbytes), b""),
self.blocksize,
):
yield bytes(block)

Expand All @@ -113,12 +149,13 @@ def __innner():
def _fetch_range(self, start, end):
assert start - self.fetch_loc == 0, "can only fetch sequentially"
self.fetch_loc = end
return next(self.__fetch, b'')
return next(self.__fetch, b"")


# Reference: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/libarchive.py
class AbCacheFilesystem(AbstractArchiveFileSystem):
"""Filesystem for reading from an AbCache on demand."""

root_marker = "/"
protocol = "abcache"
cache: AbCache
Expand Down Expand Up @@ -146,39 +183,49 @@ def __init__(self, fo: str = "", cache_obj: AbCache = None, *args, **kwargs):
def dir_cache(self):
# Reference implementation did O(n) per *every* ls() call
# We can make it O(1) with DP on tree preprocessing of O(nlogn)
bundles = self.cache.abcache_index.bundles
# Only the leaf nodes are given.
bundles = self.cache.abcache_index.bundles
# Only the leaf nodes are given.
keys = set((self.root_marker + key for key in bundles.keys()))
keys |= self._all_dirnames(bundles.keys())
# Sorting implies DFS order.
keys = [self.root_marker] + [key for key in sorted(keys)]
_trim = lambda key: key[len(self.root_marker):]
nodes = [{
"name": key,
"type": "directory" if not _trim(key) in bundles else "file",
"size": 0 if not _trim(key) in bundles else bundles[_trim(key)].fileSize,
"item_count": 0,
"file_count": 0,
"total_size": 0
} for key in keys]
_trim = lambda key: key[len(self.root_marker) :]
nodes = [
{
"name": key,
"type": "directory" if not _trim(key) in bundles else "file",
"size": (
0 if not _trim(key) in bundles else bundles[_trim(key)].fileSize
),
"item_count": 0,
"file_count": 0,
"total_size": 0,
}
for key in keys
]
# Already in DFS order.
# Get start index for each directory and their item count.
stack = [0]
stack = [0]
graph = defaultdict(list)
table = {node['name']: index for index, node in enumerate(nodes)}
table = {node["name"]: index for index, node in enumerate(nodes)}

def is_file(name):
return _trim(name) in bundles

def is_parent_path(a, b):
# a is parent of b
if a == self.root_marker: return True
return b.startswith(a + self.root_marker)
if a == self.root_marker:
return True
return b.startswith(a + self.root_marker)

def maintain():
# Always starts from root. Safe to assume stack size >= 2
u,v = stack[-2], stack[-1]
nodes[u]["item_count"] += nodes[v]["item_count"]
nodes[u]["file_count"] += nodes[v]["file_count"]
nodes[u]["total_size"] += nodes[v]["total_size"]
u, v = stack[-2], stack[-1]
nodes[u]["item_count"] += nodes[v]["item_count"]
nodes[u]["file_count"] += nodes[v]["file_count"]
nodes[u]["total_size"] += nodes[v]["total_size"]
stack.pop()

for index, name in enumerate(keys):
# Skip root
if index == 0:
Expand All @@ -188,15 +235,15 @@ def maintain():
pa = stack[-1]
nodes[pa]["item_count"] += 1
graph[pa].append(index)
if not is_file(name):
if not is_file(name):
stack.append(index)
else:
nodes[pa]["file_count"] += 1
nodes[pa]["total_size"] += nodes[index]["size"]
nodes[index]["total_size"] = nodes[index]["size"]
while len(stack) >= 2:
maintain()
assert nodes[0]['file_count'] == len(bundles), "file count mismatch"
assert nodes[0]["file_count"] == len(bundles), "file count mismatch"
return nodes, graph, table

def _get_dirs(self):
Expand All @@ -209,14 +256,14 @@ def info(self, path, **kwargs):
return nodes[table[path]]
else:
raise FileNotFoundError(path)

@cache
def ls(self, path, detail=True, **kwargs):
nodes, graph, table = self._get_dirs()
path = path or self.root_marker
if path in table:
u = table[path]
return [nodes[v] if detail else nodes[v]['name'] for v in graph[u]]
return [nodes[v] if detail else nodes[v]["name"] for v in graph[u]]
return []

def open(self, path, mode="rb"):
Expand Down
4 changes: 4 additions & 0 deletions sssekai/entrypoint/abdecrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
def main_abdecrypt(args):
from sssekai.crypto.AssetBundle import decrypt_iter

args.outdir = os.path.abspath(args.outdir)
args.indir = os.path.abspath(args.indir)
tree = os.walk(args.indir)
for root, dirs, files in tree:
if root == args.outdir:
continue
for fname in files:
file = os.path.join(root, fname)
if os.path.isfile(file):
Expand Down
1 change: 0 additions & 1 deletion sssekai/entrypoint/abserve.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def do_GET(self):
self.send_header("Content-type", "application/octet-stream")
# XXX: Size reported by bundles' metadata is not accurate.
# If a wrong size is reported, the browser will reject the download.
# TODO: Figure out why the size is wrong.
# self.send_header("Content-Length", fs.stat(path)["size"])
self.end_headers()
with fs.open(path, "rb") as f:
Expand Down

0 comments on commit d9c39f9

Please sign in to comment.