diff --git a/poetry.lock b/poetry.lock index 202cf4c405..070af38404 100644 --- a/poetry.lock +++ b/poetry.lock @@ -195,6 +195,17 @@ python-versions = ">=3.6" docs = ["Sphinx (>=4)", "furo (>=2021.7.5b38)", "proselint (>=0.10.2)", "sphinx-autodoc-typehints (>=1.12)"] test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock (>=3.6)"] +[[package]] +name = "plotext" +version = "4.1.5" +description = "plotext plots directly on terminal" +category = "main" +optional = false +python-versions = ">=3.5" + +[package.extras] +image = ["pillow (>=8.4)"] + [[package]] name = "pluggy" version = "1.0.0" @@ -432,7 +443,7 @@ python-versions = "*" [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "ffe9987608cea86289af4f4b3ecbfddcd4e89606d21b63d7e8c08ef91c7efaa1" +content-hash = "cbb70ba541d2367667f28704be10d4ec881916a6dc4f391c3c6b05af21bb159e" [metadata.files] arpy = [ @@ -549,6 +560,10 @@ platformdirs = [ {file = "platformdirs-2.4.0-py3-none-any.whl", hash = "sha256:8868bbe3c3c80d42f20156f22e7131d2fb321f5bc86a2a345375c6481a67021d"}, {file = "platformdirs-2.4.0.tar.gz", hash = "sha256:367a5e80b3d04d2428ffa76d33f124cf11e8fff2acdaa9b43d545f5c7d661ef2"}, ] +plotext = [ + {file = "plotext-4.1.5-py3-none-any.whl", hash = "sha256:60395f4f764194f04681a1abd70379410f1da756a697a9727b0e64a862c62de0"}, + {file = "plotext-4.1.5.tar.gz", hash = "sha256:ba0a2870f55b4bbc75a06228b4779bdfd2608f1f43ceb4f8851d9f9a63923ef8"}, +] pluggy = [ {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, diff --git a/pyproject.toml b/pyproject.toml index f911ffa7c5..4c5bb5c479 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ python-lzo = "^1.12" cstruct = "2.1" jefferson = { git = "https://github.com/IoT-Inspector/jefferson.git", rev = "216eee6c56d338e5a14a0af5d07f1117cff92b3b" } yaffshiv = { git = "https://github.com/IoT-Inspector/yaffshiv.git", rev = "24e6e453a36a02144ae2d159eb3229f9c6312828" } - +plotext = "^4.1.5" [tool.poetry.dev-dependencies] lark = "^1.0.0" diff --git a/tests/test_cli.py b/tests/test_cli.py index 9846baa9c7..38c83d7389 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -113,16 +113,20 @@ def test_empty_dir_as_file(tmp_path: Path): @pytest.mark.parametrize( - "params, expected_depth, expected_verbosity", + "params, expected_depth, expected_entropy_depth, expected_verbosity", ( - pytest.param([], DEFAULT_DEPTH, False, id="empty"), - pytest.param(["--verbose"], DEFAULT_DEPTH, True, id="verbose"), - pytest.param(["--depth", "2"], 2, False, id="depth"), - pytest.param(["--verbose", "--depth", "2"], 2, True, id="verbose+depth"), + pytest.param([], DEFAULT_DEPTH, 1, False, id="empty"), + pytest.param(["--verbose"], DEFAULT_DEPTH, 1, True, id="verbose"), + pytest.param(["--depth", "2"], 2, 1, False, id="depth"), + pytest.param(["--verbose", "--depth", "2"], 2, 1, True, id="verbose+depth"), ), ) def test_archive_success( - params, expected_depth: int, expected_verbosity: bool, tmp_path: Path + params, + expected_depth: int, + expected_entropy_depth: int, + expected_verbosity: bool, + tmp_path: Path, ): runner = CliRunner() in_path = ( @@ -144,7 +148,12 @@ def test_archive_success( assert "error" not in result.output assert "warning" not in result.output process_file_mock.assert_called_once_with( - in_path, in_path, tmp_path, max_depth=expected_depth + in_path, + in_path, + tmp_path, + max_depth=expected_depth, + entropy_depth=expected_entropy_depth, + verbose=expected_verbosity, ) logger_config_mock.assert_called_once_with(expected_verbosity, tmp_path) @@ -176,6 +185,20 @@ def test_archive_multiple_files(tmp_path: Path): assert result.exit_code == 0 assert process_file_mock.call_count == 2 assert process_file_mock.call_args_list == [ - mock.call(in_path_1, in_path_1, tmp_path, max_depth=DEFAULT_DEPTH), - mock.call(in_path_2, in_path_2, tmp_path, max_depth=DEFAULT_DEPTH), + mock.call( + in_path_1, + in_path_1, + tmp_path, + max_depth=DEFAULT_DEPTH, + entropy_depth=1, + verbose=False, + ), + mock.call( + in_path_2, + in_path_2, + tmp_path, + max_depth=DEFAULT_DEPTH, + entropy_depth=1, + verbose=False, + ), ] diff --git a/tests/test_handlers.py b/tests/test_handlers.py index b7bd5482e8..2c3c8712bb 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -44,6 +44,7 @@ def test_all_handlers(input_dir: Path, output_dir: Path, tmp_path: Path): path=input_dir, extract_root=tmp_path, max_depth=DEFAULT_DEPTH, + entropy_depth=0, ) diff_command = [ diff --git a/unblob/cli.py b/unblob/cli.py index dfcc040e14..e5f5d0734a 100644 --- a/unblob/cli.py +++ b/unblob/cli.py @@ -60,10 +60,22 @@ def get_help_text(): @click.option( "-d", "--depth", - type=int, default=DEFAULT_DEPTH, + type=click.IntRange(1), + show_default=True, help="Recursion depth. How deep should we extract containers.", ) +@click.option( + "-n", + "--entropy-depth", + type=click.IntRange(0), + default=1, + show_default=True, + help=( + "Entropy calculation depth. How deep should we calculate entropy for unknown files? " + "1 means input files only, 0 turns it off." + ), +) @click.option("-v", "--verbose", is_flag=True, help="Verbose mode, enable debug logs.") @click.option( "--show-external-dependencies", @@ -73,12 +85,25 @@ def get_help_text(): expose_value=False, is_eager=True, ) -def cli(files: Tuple[Path], extract_root: Path, depth: int, verbose: bool): +def cli( + files: Tuple[Path], + extract_root: Path, + depth: int, + entropy_depth: int, + verbose: bool, +): configure_logger(verbose, extract_root) logger.info("Start processing files", count=noformat(len(files))) for path in files: root = path if path.is_dir() else path.parent - process_file(root, path, extract_root, max_depth=depth) + process_file( + root, + path, + extract_root, + max_depth=depth, + entropy_depth=entropy_depth, + verbose=verbose, + ) def main(): diff --git a/unblob/extractor.py b/unblob/extractor.py index f5102179ca..97085e332d 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -5,7 +5,7 @@ import shlex import subprocess from pathlib import Path -from typing import Iterator, List +from typing import List from structlog import get_logger @@ -68,25 +68,29 @@ def extract_with_command( def carve_unknown_chunks( extract_dir: Path, file: io.BufferedIOBase, unknown_chunks: List[UnknownChunk] -): +) -> List[Path]: if not unknown_chunks: - return + return [] + carved_paths = [] logger.warning("Found unknown Chunks", chunks=unknown_chunks) + for chunk in unknown_chunks: filename = f"{chunk.start_offset}-{chunk.end_offset}.unknown" carve_path = extract_dir / filename logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk) carve_chunk_to_file(carve_path, file, chunk) + carved_paths.append(carve_path) + return carved_paths -def extract_valid_chunks( - extract_dir: Path, file: io.BufferedIOBase, valid_chunks: List[ValidChunk] -) -> Iterator[Path]: - for chunk in valid_chunks: - filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}" - carve_path = extract_dir / filename - logger.info("Extracting valid chunk", path=carve_path, chunk=chunk) - carve_chunk_to_file(carve_path, file, chunk) - extracted = extract_with_command(extract_dir, carve_path, chunk.handler) - yield extracted + +def extract_valid_chunk( + extract_dir: Path, file: io.BufferedIOBase, chunk: ValidChunk +) -> Path: + filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}" + carve_path = extract_dir / filename + logger.info("Extracting valid chunk", path=carve_path, chunk=chunk) + carve_chunk_to_file(carve_path, file, chunk) + extracted = extract_with_command(extract_dir, carve_path, chunk.handler) + return extracted diff --git a/unblob/finder.py b/unblob/finder.py index 104c20dd40..39424ba15e 100644 --- a/unblob/finder.py +++ b/unblob/finder.py @@ -111,6 +111,8 @@ def search_chunks_by_priority( # noqa: C901 logger.info("Found valid chunk", chunk=chunk, handler=handler.NAME) all_chunks.append(chunk) + logger.info("Ended priority level", priority_level=noformat(priority_level)) + return all_chunks diff --git a/unblob/math.py b/unblob/math.py new file mode 100644 index 0000000000..65403629ac --- /dev/null +++ b/unblob/math.py @@ -0,0 +1,4 @@ +try: + from ._rust import shannon_entropy +except ImportError: + from ._py.math import shannon_entropy # noqa: F401 diff --git a/unblob/processing.py b/unblob/processing.py index 3f19865aa7..67298a6e66 100644 --- a/unblob/processing.py +++ b/unblob/processing.py @@ -1,14 +1,18 @@ import stat +import statistics from operator import attrgetter from pathlib import Path from typing import List +import plotext as plt from structlog import get_logger -from .extractor import carve_unknown_chunks, extract_valid_chunks, make_extract_dir +from .extractor import carve_unknown_chunks, extract_valid_chunk, make_extract_dir +from .file_utils import iterate_file from .finder import search_chunks_by_priority from .iter_utils import pairwise from .logging import noformat +from .math import shannon_entropy from .models import UnknownChunk, ValidChunk logger = get_logger() @@ -16,11 +20,15 @@ DEFAULT_DEPTH = 10 -def process_file( +# TODO: this function became too complex when adding entropy calculation, but +# it will be simplified in a separate branch, because the refactor is very complex +def process_file( # noqa: C901 root: Path, path: Path, extract_root: Path, max_depth: int, + entropy_depth: int, + verbose: bool = False, current_depth: int = 0, ): log = logger.bind(path=path) @@ -36,7 +44,15 @@ def process_file( if stat.S_ISDIR(mode): log.info("Found directory") for path in path.iterdir(): - process_file(root, path, extract_root, max_depth, current_depth + 1) + process_file( + root, + path, + extract_root, + max_depth, + entropy_depth, + verbose, + current_depth + 1, + ) return elif stat.S_ISLNK(mode): @@ -54,13 +70,29 @@ def process_file( outer_chunks = remove_inner_chunks(all_chunks) unknown_chunks = calculate_unknown_chunks(outer_chunks, size) if not outer_chunks and not unknown_chunks: + # we don't consider whole files as unknown chunks, but we still want to + # calculate entropy for whole files which produced no valid chunks + if current_depth < entropy_depth: + calculate_entropy(path, draw_plot=verbose) return extract_dir = make_extract_dir(root, path, extract_root) - carve_unknown_chunks(extract_dir, file, unknown_chunks) - for new_path in extract_valid_chunks(extract_dir, file, outer_chunks): + + carved_paths = carve_unknown_chunks(extract_dir, file, unknown_chunks) + if current_depth < entropy_depth: + for carved_path in carved_paths: + calculate_entropy(carved_path, draw_plot=verbose) + + for chunk in outer_chunks: + new_path = extract_valid_chunk(extract_dir, file, chunk) process_file( - extract_root, new_path, extract_root, max_depth, current_depth + 1 + extract_root, + new_path, + extract_root, + max_depth, + entropy_depth, + verbose, + current_depth + 1, ) @@ -119,3 +151,72 @@ def calculate_unknown_chunks( unknown_chunks.append(unknown_chunk) return unknown_chunks + + +def calculate_entropy(path: Path, *, draw_plot: bool): + """Calculate and log shannon entropy divided by 8 for the file in 1mB chunks. + + Shannon entropy returns the amount of information (in bits) of some numeric + sequence. We calculate the average entropy of byte chunks, which in theory + can contain 0-8 bits of entropy. We normalize it for visualization to a + 0-100% scale, to make it easier to interpret the graph. + """ + percentages = [] + + # We could use the chunk size instead of another syscall, + # but we rely on the actual file size written to the disk + file_size = path.stat().st_size + logger.info("Calculating entropy for file", path=path, size=file_size) + + # Smaller chuk size would be very slow to calculate. + # 1Mb chunk size takes ~ 3sec for a 4,5 GB file. + buffer_size = calculate_buffer_size( + file_size, chunk_count=80, min_limit=1024, max_limit=1024 * 1024 + ) + + with path.open("rb") as file: + for chunk in iterate_file(file, 0, file_size, buffer_size=buffer_size): + entropy = shannon_entropy(chunk) + entropy_percentage = round(entropy / 8 * 100, 2) + percentages.append(entropy_percentage) + + logger.info( + "Entropy calculated", + mean=round(statistics.mean(percentages), 2), + highest=max(percentages), + lowest=min(percentages), + ) + + if draw_plot: + draw_entropy_plot(percentages) + + +def calculate_buffer_size( + file_size, *, chunk_count: int, min_limit: int, max_limit: int +) -> int: + """Split the file into even sized chunks, limited by lower and upper values.""" + # We don't care about floating point precision here + buffer_size = file_size // chunk_count + buffer_size = max(min_limit, buffer_size) + buffer_size = min(buffer_size, max_limit) + return buffer_size + + +def draw_entropy_plot(percentages: List[float]): + plt.clear_data() + plt.colorless() + plt.title("Entropy distribution") + plt.xlabel("mB") + plt.ylabel("entropy %") + + plt.scatter(percentages, marker="dot") + # 16 height leaves no gaps between the lines + plt.plot_size(100, 16) + plt.ylim(0, 100) + # Draw ticks every 1Mb on the x axis. + plt.xticks(range(len(percentages) + 1)) + # Always show 0% and 100% + plt.yticks(range(0, 101, 10)) + + # New line so that chart title will be aligned correctly in the next line + logger.debug("Entropy chart", chart="\n" + plt.build())