Skip to content

Commit

Permalink
Implement chunk comparison and selective extraction
Browse files Browse the repository at this point in the history
- Add compare_and_extract_chunks functionality
- Add comprehensive test coverage
- Fix file state tracking with st parameter
  • Loading branch information
alighazi288 committed Jan 20, 2025
1 parent b9498ca commit 57760ef
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 2 deletions.
64 changes: 63 additions & 1 deletion src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,63 @@ def extract_helper(self, item, path, hlm, *, dry_run=False):
# In this case, we *want* to extract twice, because there is no other way.
pass

def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None):
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
if st is None:
return False
try:
# First pass: Build fs chunks list
fs_chunks = []
with backup_io("open"):
fs_file = open(fs_path, "rb")
with fs_file:
for chunk in item.chunks:
with backup_io("read"):
data = fs_file.read(chunk.size)

fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data)))

# Compare chunks and collect needed chunk IDs
needed_chunks = []
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk != item_chunk:
needed_chunks.append(item_chunk)

# Fetch all needed chunks and iterate through ALL of them
chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM)

# Second pass: Update file
with backup_io("open"):
fs_file = open(fs_path, "rb+")
with fs_file:
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk == item_chunk:
with backup_io("seek"):
fs_file.seek(item_chunk.size, 1)
else:
chunk_data = next(chunk_data_iter)

with backup_io("write"):
fs_file.write(chunk_data)
if pi:
pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)])

Check warning on line 761 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L761

Added line #L761 was not covered by tests

final_size = fs_file.tell()
with backup_io("truncate_and_attrs"):
fs_file.truncate(item.size)
fs_file.flush()
self.restore_attrs(fs_path, item, fd=fs_file.fileno())

if "size" in item and item.size != final_size:
raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}")

Check warning on line 770 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L770

Added line #L770 was not covered by tests

if "chunks_healthy" in item and not item.chunks_healthy:
raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")

Check warning on line 773 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L773

Added line #L773 was not covered by tests

return True
except OSError:
return False

Check warning on line 777 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L776-L777

Added lines #L776 - L777 were not covered by tests

def extract_item(
self,
item,
Expand Down Expand Up @@ -802,12 +859,14 @@ def same_item(item, st):
return # done! we already have fully extracted this file in a previous run.
elif stat.S_ISDIR(st.st_mode):
os.rmdir(path)
st = None
else:
os.unlink(path)
st = None
except UnicodeEncodeError:
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
except OSError:
pass
st = None

def make_parent(path):
parent_dir = os.path.dirname(path)
Expand All @@ -821,6 +880,9 @@ def make_parent(path):
with self.extract_helper(item, path, hlm) as hardlink_set:
if hardlink_set:
return
if self.compare_and_extract_chunks(item, path, st=st, pi=pi):
return

Check warning on line 884 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L884

Added line #L884 was not covered by tests

with backup_io("open"):
fd = open(path, "wb")
with fd:
Expand Down
165 changes: 164 additions & 1 deletion src/borg/testsuite/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid
from ..helpers import msgpack
from ..item import Item, ArchiveItem
from ..item import Item, ArchiveItem, ChunkListEntry
from ..manifest import Manifest
from ..platform import uid2user, gid2group, is_win32

Expand Down Expand Up @@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None):
self.objects[id] = data
return id, len(data)

def fetch_many(self, ids, ro_type=None):
"""Mock implementation of fetch_many"""
for id in ids:
yield self.objects[id]


def test_cache_chunk_buffer():
data = [Item(path="p1"), Item(path="p2")]
Expand Down Expand Up @@ -402,3 +407,161 @@ def test_reject_non_sanitized_item():
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
Item(path=path, user="root", group="root")


@pytest.fixture
def setup_extractor(tmpdir):
"""Setup common test infrastructure"""

class MockCache:
def __init__(self):
self.objects = {}

repository = Mock()
key = PlaintextKey(repository)
manifest = Manifest(key, repository)
cache = MockCache()

extractor = Archive(manifest=manifest, name="test", create=True)
extractor.pipeline = cache
extractor.key = key
extractor.cwd = str(tmpdir)
extractor.restore_attrs = Mock()

# Track fetched chunks across tests
fetched_chunks = []

def create_mock_chunks(item_data, chunk_size=4):
chunks = []
for i in range(0, len(item_data), chunk_size):
chunk_data = item_data[i : i + chunk_size]
chunk_id = key.id_hash(chunk_data)
chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data)))
cache.objects[chunk_id] = chunk_data

item = Mock(spec=["chunks", "size", "__contains__", "get"])
item.chunks = chunks
item.size = len(item_data)
item.__contains__ = lambda self, item: item == "size"

return item, str(tmpdir.join("test.txt"))

def mock_fetch_many(chunk_ids, ro_type=None):
fetched_chunks.extend(chunk_ids)
return iter([cache.objects[chunk_id] for chunk_id in chunk_ids])

def clear_fetched_chunks():
fetched_chunks.clear()

def get_fetched_chunks():
return fetched_chunks

cache.fetch_many = mock_fetch_many

return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks


@pytest.mark.parametrize(
"name, item_data, fs_data, expected_fetched_chunks",
[
(
"no_changes",
b"1111", # One complete chunk, no changes needed
b"1111", # Identical content
0, # No chunks should be fetched
),
(
"single_chunk_change",
b"11112222", # Two chunks
b"1111XXXX", # Second chunk different
1, # Only second chunk should be fetched
),
(
"cross_boundary_change",
b"11112222", # Two chunks
b"111XX22", # Change crosses chunk boundary
2, # Both chunks need update
),
(
"exact_multiple_chunks",
b"11112222333", # Three chunks (last one partial)
b"1111XXXX333", # Middle chunk different
1, # Only middle chunk fetched
),
(
"first_chunk_change",
b"11112222", # Two chunks
b"XXXX2222", # First chunk different
1, # Only first chunk should be fetched
),
(
"all_chunks_different",
b"11112222", # Two chunks
b"XXXXYYYY", # Both chunks different
2, # Both chunks should be fetched
),
(
"partial_last_chunk",
b"111122", # One full chunk + partial
b"1111XX", # Partial chunk different
1, # Only second chunk should be fetched
),
(
"fs_file_shorter",
b"11112222", # Two chunks in archive
b"111122", # Shorter on disk - missing part of second chunk
1, # Should fetch second chunk
),
(
"fs_file_longer",
b"11112222", # Two chunks in archive
b"1111222233", # Longer on disk
0, # Should fetch no chunks since content matches up to archive length
),
(
"empty_archive_file",
b"", # Empty in archive
b"11112222", # Content on disk
0, # No chunks to compare = no chunks to fetch
),
(
"empty_fs_file",
b"11112222", # Two chunks in archive
b"", # Empty on disk
2, # Should fetch all chunks since file is empty
),
],
)
def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks):
"""Test chunk comparison and extraction"""
extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor
clear_fetched_chunks()

chunk_size = 4
item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size)

original_chunk_ids = [chunk.id for chunk in item.chunks]

with open(target_path, "wb") as f:
f.write(fs_data)

st = os.stat(target_path)
result = extractor.compare_and_extract_chunks(item, target_path, st=st)
assert result

fetched_chunks = get_fetched_chunks()
assert len(fetched_chunks) == expected_fetched_chunks

# For single chunk changes, verify it's the correct chunk
if expected_fetched_chunks == 1:
item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)]
fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)]

# Find which chunk should have changed by comparing item_data with fs_data
for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)):
if item_chunk != fs_chunk:
assert fetched_chunks[0] == original_chunk_ids[i]
break

with open(target_path, "rb") as f:
assert f.read() == item_data

0 comments on commit 57760ef

Please sign in to comment.