Skip to content

Commit

Permalink
Merge pull request #51 from ascmitc/dev/split-directory-hashes
Browse files Browse the repository at this point in the history
Merge code containing the split of data and structure hashes. Directories now contain two hash values - one for the file system structure and one for the contained data.
  • Loading branch information
jwaggs authored Jun 15, 2021
2 parents c524fd5 + ec4992a commit 87d4c19
Show file tree
Hide file tree
Showing 49 changed files with 756 additions and 180 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ xsd/xsdvi.log

docs/build
docs/docenv
.coverage
htmlcov
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ on error (including mismatching hashes):
```


#### `verify` with `-dh` subcommand option (for directory hash) _[not implemented yet]_
#### `verify` with `-dh` subcommand option (for directory hash)

The `verify` command with the `-dh` subcommand option creates the directory hash by hashing the contained files of the given directory path (filtered by the ignore patterns from the `ascmhl` folder) and compares it with the to-be-expected directory hash calculated from the file hashes (same calculation as the `info` command with the `-dh` subcommand option).

Expand Down
237 changes: 219 additions & 18 deletions ascmhl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,14 @@ def create(root_path, verbose, hash_format, no_directory_hashes, single_file, ig
"""
# distinguish different behavior for entire folder vs single files
if single_file is not None and len(single_file) > 0:
create_for_single_files_subcommand(root_path, verbose, hash_format, no_directory_hashes, single_file)
create_for_single_files_subcommand(root_path, verbose, hash_format, single_file, ignore_list, ignore_spec_file)
return
create_for_folder_subcommand(
root_path, verbose, hash_format, no_directory_hashes, single_file, ignore_list, ignore_spec_file
)
create_for_folder_subcommand(root_path, verbose, hash_format, no_directory_hashes, ignore_list, ignore_spec_file)
return


def create_for_folder_subcommand(
root_path, verbose, hash_format, no_directory_hashes, single_file, ignore_list=None, ignore_spec_file=None
root_path, verbose, hash_format, no_directory_hashes, ignore_list=None, ignore_spec_file=None
):
# command formerly known as "seal"
"""
Expand Down Expand Up @@ -134,8 +132,8 @@ def create_for_folder_subcommand(

num_failed_verifications = 0
# store the directory hashes of sub folders so we can use it when calculating the hash of the parent folder
dir_hash_mappings = {}

dir_content_hash_mappings = {}
dir_structure_hash_mappings = {}
for folder_path, children in post_order_lexicographic(root_path, session.ignore_spec.get_path_spec()):
# generate directory hashes
dir_hash_context = None
Expand All @@ -147,19 +145,27 @@ def create_for_folder_subcommand(
if is_dir:
if not dir_hash_context:
continue
hash_string = dir_hash_mappings.pop(file_path)
if dir_hash_context:
dir_hash_context.append_directory_hashes(
file_path, dir_content_hash_mappings.pop(file_path), dir_structure_hash_mappings.pop(file_path)
)
else:
hash_string, success = seal_file_path(existing_history, file_path, hash_format, session)
if not success:
num_failed_verifications += 1
if dir_hash_context:
dir_hash_context.append_hash(hash_string, item_name)
dir_hash = None
if not no_directory_hashes:
dir_hash_context.append_file_hash(file_path, hash_string)
dir_content_hash = None
dir_structure_hash = None
if dir_hash_context:
dir_hash = dir_hash_context.final_hash_str()
dir_hash_mappings[folder_path] = dir_hash
dir_content_hash = dir_hash_context.final_content_hash_str()
dir_structure_hash = dir_hash_context.final_structure_hash_str()
dir_content_hash_mappings[folder_path] = dir_content_hash
dir_structure_hash_mappings[folder_path] = dir_structure_hash
modification_date = datetime.datetime.fromtimestamp(os.path.getmtime(folder_path))
session.append_directory_hash(folder_path, modification_date, hash_format, dir_hash)
session.append_directory_hashes(
folder_path, modification_date, hash_format, dir_content_hash, dir_structure_hash
)

commit_session(session)

Expand All @@ -171,7 +177,9 @@ def create_for_folder_subcommand(
raise exception


def create_for_single_files_subcommand(root_path, verbose, hash_format, no_directory_hashes, single_file):
def create_for_single_files_subcommand(
root_path, verbose, hash_format, single_file, ignore_list=None, ignore_spec_file=None
):
# command formerly known as "record"
"""
Creates a new generation with the given file(s) or folder(s).
Expand Down Expand Up @@ -247,7 +255,16 @@ def create_for_single_files_subcommand(root_path, verbose, hash_format, no_direc
type=click.Path(exists=True),
help="A file containing multiple file patterns to ignore.",
)
def verify(root_path, verbose, ignore_list, ignore_spec_file):
# subcommand
@click.option(
"--directory_hash",
"-dh",
default=False,
is_flag=True,
help="Record single file, no completeness check (multiple occurrences possible for adding multiple files",
)
@click.option("--hash_format", "-h", type=click.Choice(ascmhl_supported_hashformats), multiple=False, help="Algorithm")
def verify(root_path, verbose, directory_hash, hash_format, ignore_list, ignore_spec_file):
"""
Verify a folder, single file(s), or a directory hash
Expand All @@ -258,7 +275,10 @@ def verify(root_path, verbose, ignore_list, ignore_spec_file):
files in the file system are reported as errors. No new ASC MHL file /
generation is created.
"""
# TODO distinguish different behavior
if directory_hash is True:
verify_directory_hash_subcommand(root_path, verbose, hash_format, ignore_list, ignore_spec_file)
return

verify_entire_folder_against_full_history_subcommand(root_path, verbose, ignore_list, ignore_spec_file)
return

Expand Down Expand Up @@ -337,8 +357,181 @@ def verify_entire_folder_against_full_history_subcommand(root_path, verbose, ign
raise exception


def verify_directory_hash_subcommand(root_path, verbose, hash_format, ignore_list=None, ignore_spec_file=None):
"""
Checks MHL directory hashes from all generations against computed directory hashes.
ROOT_PATH: the root path to use for the asc mhl history
Traverses through the content of a folder, hashes all found files, create directory hashes, and compares
("verifies") the hashes against the directory hash records in the asc-mhl folder.
Content directory hashes and structure directory hashes are compared individually.
"""
logger.verbose_logging = verbose

if not os.path.isabs(root_path):
root_path = os.path.join(os.getcwd(), root_path)

logger.verbose(f"check folder at path: {root_path}")

existing_history = MHLHistory.load_from_path(root_path)

ignore_spec = ignore.MHLIgnoreSpec(existing_history.latest_ignore_patterns(), ignore_list, ignore_spec_file)

# choose the hash format of the latest root directory hash
if hash_format is None:
generation = -1
for hash_list in existing_history.hash_lists:
if hash_list.generation_number > generation:
if len(hash_list.process_info.root_media_hash.hash_entries) > 0:
hash_format = hash_list.process_info.root_media_hash.hash_entries[0].hash_format

if hash_format is None:
logger.verbose(f"default hash format: c4")
hash_format = "c4"
else:
logger.verbose(f"hash format from latest generation with directory hashes: {hash_format}")
else:
logger.verbose(f"hash format: {hash_format}")

# start a verification session on the existing history
session = MHLGenerationCreationSession(existing_history)

num_failed_verifications = 0
# store the directory hashes of sub folders so we can use it when calculating the hash of the parent folder
dir_content_hash_mappings = {}
dir_structure_hash_mappings = {}
for folder_path, children in post_order_lexicographic(root_path, ignore_spec.get_path_spec()):
# generate directory hashes
dir_hash_context = None
dir_hash_context = DirectoryHashContext(hash_format)
for item_name, is_dir in children:
file_path = os.path.join(folder_path, item_name)
if is_dir:
file_path = os.path.join(folder_path, item_name)
relative_path = existing_history.get_relative_file_path(file_path)
history, history_relative_path = existing_history.find_history_for_path(relative_path)
# check if there are directory hashes in the generations
directory_hash_entries = history.find_directory_hash_entries_for_path(history_relative_path)

content_hash = dir_content_hash_mappings.pop(file_path)
structure_hash = dir_structure_hash_mappings.pop(file_path)
dir_hash_context.append_directory_hashes(file_path, content_hash, structure_hash)

num_successful_verifications = 0
found_hash_format = False
for directory_hash_entry in directory_hash_entries:
if directory_hash_entry.hash_format != hash_format:
continue
found_hash_format = True
num_current_successful_verifications = _compare_and_log_directory_hashes(
relative_path, directory_hash_entry, content_hash, structure_hash
)
if num_current_successful_verifications == 2:
num_successful_verifications += 1
if num_current_successful_verifications == 1:
num_failed_verifications += 1

if not found_hash_format:
logger.error(
f"ERROR: verification of folder {relative_path}: No directory hash of type {hash_format} found"
)
num_failed_verifications += 1
else:
hash_string = hash_file_path(existing_history, file_path, hash_format, session)
dir_hash_context.append_file_hash(file_path, hash_string)
dir_content_hash = None
dir_structure_hash = None
if dir_hash_context:
dir_content_hash = dir_hash_context.final_content_hash_str()
dir_structure_hash = dir_hash_context.final_structure_hash_str()
dir_content_hash_mappings[folder_path] = dir_content_hash
dir_structure_hash_mappings[folder_path] = dir_structure_hash
modification_date = datetime.datetime.fromtimestamp(os.path.getmtime(folder_path))
session.append_directory_hashes(
folder_path, modification_date, hash_format, dir_content_hash, dir_structure_hash
)

# compare root hashes, works differently
if folder_path == root_path:
found_hash_format = False
for hash_list in existing_history.hash_lists:
root_hash_entries = hash_list.process_info.root_media_hash.hash_entries
if len(root_hash_entries) > 0:
for root_hash_entry in root_hash_entries:
if root_hash_entry.hash_format == hash_format:
_compare_and_log_directory_hashes(
".", root_hash_entry, dir_content_hash, dir_structure_hash
)
found_hash_format = True
if not found_hash_format:
logger.error(f"ERROR: verification of root folder: No directory hash of type {hash_format} found")

exception = None
if num_failed_verifications > 0:
exception = errors.VerificationDirectoriesFailedException()

if exception:
raise exception


def _compare_and_log_directory_hashes(
relative_path, directory_hash_entry, calculated_content_hash_string, calculated_structure_hash_string
):
num_successful_verifications = 0
root_string = ""
if hasattr(directory_hash_entry, "temp_is_root_folder") and directory_hash_entry.temp_is_root_folder:
root_string = " (root folder in child history)"
if (
directory_hash_entry.hash_string == calculated_content_hash_string
and directory_hash_entry.structure_hash_string == calculated_structure_hash_string
):
if relative_path == ".":
logger.verbose(
f" verification of root folder OK (generation {directory_hash_entry.temp_generation_number:04d})"
)
else:
logger.verbose(
f" verification of folder {relative_path}{root_string} OK "
f"(generation {directory_hash_entry.temp_generation_number:04d})"
)

num_successful_verifications += 2
else:
if directory_hash_entry.hash_string != calculated_content_hash_string:
logger.error(
f"ERROR: content hash mismatch for {relative_path}{root_string} "
f"old {directory_hash_entry.hash_format}: {directory_hash_entry.hash_string}, "
f"new {directory_hash_entry.hash_format}: {calculated_content_hash_string} "
f"(generation {directory_hash_entry.temp_generation_number:04d})"
)
else:
logger.verbose(
f" content hash matches for {relative_path}{root_string} "
f" {directory_hash_entry.hash_format}: {directory_hash_entry.hash_string}"
f" (generation {directory_hash_entry.temp_generation_number:04d})"
)

if directory_hash_entry.structure_hash_string != calculated_structure_hash_string:
logger.error(
f"ERROR: structure hash mismatch for {relative_path}{root_string} "
f"old {directory_hash_entry.hash_format}: {directory_hash_entry.structure_hash_string}, "
f"new {directory_hash_entry.hash_format}: {calculated_structure_hash_string} "
f"(generation {directory_hash_entry.temp_generation_number:04d})"
)
else:
logger.verbose(
f" structure hash matches for {relative_path}{root_string} "
f" {directory_hash_entry.hash_format}: {directory_hash_entry.hash_string} "
f" (generation {directory_hash_entry.temp_generation_number:04d})"
)

num_successful_verifications += 1

return num_successful_verifications


# TODO def verify_single_file_subcommand(root_path, verbose):
# TODO def verify_directory_hash_subcommand(root_path, verbose):


@click.command()
Expand Down Expand Up @@ -670,3 +863,11 @@ def seal_file_path(existing_history, file_path, hash_format, session) -> (str, b
return current_format_hash, False
success &= session.append_file_hash(file_path, file_size, file_modification_date, hash_format, current_format_hash)
return current_format_hash, success


def hash_file_path(existing_history, file_path, hash_format, session) -> (str):
current_format_hash = create_filehash(hash_format, file_path)
relative_path = session.root_history.get_relative_file_path(file_path)
logger.verbose(f" created file hash for {relative_path}")

return current_format_hash
7 changes: 7 additions & 0 deletions ascmhl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ def __init__(self):
super().__init__("Verification of files referenced in the ASC MHL history failed")


class VerificationDirectoriesFailedException(click.ClickException):
exit_code = 15

def __init__(self):
super().__init__("Verification of directories referenced in the ASC MHL history failed")


class NewFilesFoundException(click.ClickException):
exit_code = 13

Expand Down
28 changes: 21 additions & 7 deletions ascmhl/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def append_file_hash(self, file_path, file_size, file_modification_date, hash_fo
media_hash.append_hash_entry(hash_entry)
return hash_entry.action != "failed"

def append_directory_hash(self, path, modification_date, hash_format, hash_string) -> None:
def append_directory_hashes(
self, path, modification_date, hash_format, content_hash_string, structure_hash_string
) -> None:

relative_path = self.root_history.get_relative_file_path(path)
# TODO: handle if path is outside of history root path
Expand All @@ -95,12 +97,22 @@ def append_directory_hash(self, path, modification_date, hash_format, hash_strin
media_hash = new_hash_list.find_or_create_media_hash_for_path(history_relative_path, None, modification_date)
media_hash.is_directory = True

if hash_string:
media_hash.append_hash_entry(MHLHashEntry(hash_format, hash_string))
if content_hash_string:
hash_entry = MHLHashEntry(hash_format, content_hash_string)
hash_entry.structure_hash_string = structure_hash_string
media_hash.append_hash_entry(hash_entry)
if relative_path == ".":
logger.verbose(f" calculated root hash {hash_format}: {hash_string}")
logger.verbose(
f" calculated root hash {hash_format}: "
f"{content_hash_string} (content), "
f"{structure_hash_string} (structure)"
)
else:
logger.verbose(f" calculated directory hash for {relative_path} {hash_format}: {hash_string}")
logger.verbose(
f" calculated directory hash for {relative_path} {hash_format}: "
f"{content_hash_string} (content), "
f"{structure_hash_string} (structure)"
)
else:
logger.verbose(f" added directory entry for {relative_path}")

Expand All @@ -113,8 +125,10 @@ def append_directory_hash(self, path, modification_date, hash_format, hash_strin
parent_relative_path, None, modification_date
)
parent_media_hash.is_directory = True
if hash_string:
parent_media_hash.append_hash_entry(MHLHashEntry(hash_format, hash_string))
if content_hash_string:
hash_entry = MHLHashEntry(hash_format, content_hash_string)
hash_entry.structure_hash_string = structure_hash_string
parent_media_hash.append_hash_entry(hash_entry)

def commit(self, creator_info: MHLCreatorInfo, process_info: MHLProcessInfo):
"""
Expand Down
Loading

0 comments on commit 87d4c19

Please sign in to comment.