Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate entropy #199

Merged
merged 7 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ python-lzo = "^1.12"
cstruct = "2.1"
jefferson = { git = "https://github.com/IoT-Inspector/jefferson.git", rev = "216eee6c56d338e5a14a0af5d07f1117cff92b3b" }
yaffshiv = { git = "https://github.com/IoT-Inspector/yaffshiv.git", rev = "24e6e453a36a02144ae2d159eb3229f9c6312828" }

plotext = "^4.1.5"

[tool.poetry.dev-dependencies]
lark = "^1.0.0"
Expand Down
41 changes: 32 additions & 9 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,20 @@ def test_empty_dir_as_file(tmp_path: Path):


@pytest.mark.parametrize(
"params, expected_depth, expected_verbosity",
"params, expected_depth, expected_entropy_depth, expected_verbosity",
(
pytest.param([], DEFAULT_DEPTH, False, id="empty"),
pytest.param(["--verbose"], DEFAULT_DEPTH, True, id="verbose"),
pytest.param(["--depth", "2"], 2, False, id="depth"),
pytest.param(["--verbose", "--depth", "2"], 2, True, id="verbose+depth"),
pytest.param([], DEFAULT_DEPTH, 1, False, id="empty"),
pytest.param(["--verbose"], DEFAULT_DEPTH, 1, True, id="verbose"),
pytest.param(["--depth", "2"], 2, 1, False, id="depth"),
pytest.param(["--verbose", "--depth", "2"], 2, 1, True, id="verbose+depth"),
),
)
def test_archive_success(
params, expected_depth: int, expected_verbosity: bool, tmp_path: Path
params,
expected_depth: int,
expected_entropy_depth: int,
expected_verbosity: bool,
tmp_path: Path,
):
runner = CliRunner()
in_path = (
Expand All @@ -144,7 +148,12 @@ def test_archive_success(
assert "error" not in result.output
assert "warning" not in result.output
process_file_mock.assert_called_once_with(
in_path, in_path, tmp_path, max_depth=expected_depth
in_path,
in_path,
tmp_path,
max_depth=expected_depth,
entropy_depth=expected_entropy_depth,
verbose=expected_verbosity,
)
logger_config_mock.assert_called_once_with(expected_verbosity, tmp_path)

Expand Down Expand Up @@ -176,6 +185,20 @@ def test_archive_multiple_files(tmp_path: Path):
assert result.exit_code == 0
assert process_file_mock.call_count == 2
assert process_file_mock.call_args_list == [
mock.call(in_path_1, in_path_1, tmp_path, max_depth=DEFAULT_DEPTH),
mock.call(in_path_2, in_path_2, tmp_path, max_depth=DEFAULT_DEPTH),
mock.call(
in_path_1,
in_path_1,
tmp_path,
max_depth=DEFAULT_DEPTH,
entropy_depth=1,
verbose=False,
),
mock.call(
in_path_2,
in_path_2,
tmp_path,
max_depth=DEFAULT_DEPTH,
entropy_depth=1,
verbose=False,
),
]
1 change: 1 addition & 0 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_all_handlers(input_dir: Path, output_dir: Path, tmp_path: Path):
path=input_dir,
extract_root=tmp_path,
max_depth=DEFAULT_DEPTH,
entropy_depth=0,
)

diff_command = [
Expand Down
31 changes: 28 additions & 3 deletions unblob/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,22 @@ def get_help_text():
@click.option(
"-d",
"--depth",
type=int,
default=DEFAULT_DEPTH,
type=click.IntRange(1),
show_default=True,
help="Recursion depth. How deep should we extract containers.",
)
@click.option(
"-n",
"--entropy-depth",
type=click.IntRange(0),
default=1,
show_default=True,
help=(
"Entropy calculation depth. How deep should we calculate entropy for unknown files? "
"1 means input files only, 0 turns it off."
),
)
@click.option("-v", "--verbose", is_flag=True, help="Verbose mode, enable debug logs.")
@click.option(
"--show-external-dependencies",
Expand All @@ -73,12 +85,25 @@ def get_help_text():
expose_value=False,
is_eager=True,
)
def cli(files: Tuple[Path], extract_root: Path, depth: int, verbose: bool):
def cli(
files: Tuple[Path],
extract_root: Path,
depth: int,
entropy_depth: int,
verbose: bool,
):
configure_logger(verbose, extract_root)
logger.info("Start processing files", count=noformat(len(files)))
for path in files:
root = path if path.is_dir() else path.parent
process_file(root, path, extract_root, max_depth=depth)
process_file(
root,
path,
extract_root,
max_depth=depth,
entropy_depth=entropy_depth,
verbose=verbose,
)


def main():
Expand Down
30 changes: 17 additions & 13 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import shlex
import subprocess
from pathlib import Path
from typing import Iterator, List
from typing import List

from structlog import get_logger

Expand Down Expand Up @@ -68,25 +68,29 @@ def extract_with_command(

def carve_unknown_chunks(
extract_dir: Path, file: io.BufferedIOBase, unknown_chunks: List[UnknownChunk]
):
) -> List[Path]:
if not unknown_chunks:
return
return []

carved_paths = []
logger.warning("Found unknown Chunks", chunks=unknown_chunks)

for chunk in unknown_chunks:
filename = f"{chunk.start_offset}-{chunk.end_offset}.unknown"
carve_path = extract_dir / filename
logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
carved_paths.append(carve_path)

return carved_paths

def extract_valid_chunks(
extract_dir: Path, file: io.BufferedIOBase, valid_chunks: List[ValidChunk]
) -> Iterator[Path]:
for chunk in valid_chunks:
filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}"
carve_path = extract_dir / filename
logger.info("Extracting valid chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
extracted = extract_with_command(extract_dir, carve_path, chunk.handler)
yield extracted

def extract_valid_chunk(
extract_dir: Path, file: io.BufferedIOBase, chunk: ValidChunk
) -> Path:
filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}"
carve_path = extract_dir / filename
logger.info("Extracting valid chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
extracted = extract_with_command(extract_dir, carve_path, chunk.handler)
return extracted
2 changes: 2 additions & 0 deletions unblob/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def search_chunks_by_priority( # noqa: C901
logger.info("Found valid chunk", chunk=chunk, handler=handler.NAME)
all_chunks.append(chunk)

logger.info("Ended priority level", priority_level=noformat(priority_level))

return all_chunks


Expand Down
4 changes: 4 additions & 0 deletions unblob/math.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
try:
from ._rust import shannon_entropy
except ImportError:
from ._py.math import shannon_entropy # noqa: F401
113 changes: 107 additions & 6 deletions unblob/processing.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
import stat
import statistics
from operator import attrgetter
from pathlib import Path
from typing import List

import plotext as plt
from structlog import get_logger

from .extractor import carve_unknown_chunks, extract_valid_chunks, make_extract_dir
from .extractor import carve_unknown_chunks, extract_valid_chunk, make_extract_dir
from .file_utils import iterate_file
from .finder import search_chunks_by_priority
from .iter_utils import pairwise
from .logging import noformat
from .math import shannon_entropy
from .models import UnknownChunk, ValidChunk

logger = get_logger()

DEFAULT_DEPTH = 10


def process_file(
# TODO: this function became too complex when adding entropy calculation, but
# it will be simplified in a separate branch, because the refactor is very complex
def process_file( # noqa: C901
root: Path,
path: Path,
extract_root: Path,
max_depth: int,
entropy_depth: int,
verbose: bool = False,
current_depth: int = 0,
):
log = logger.bind(path=path)
Expand All @@ -36,7 +44,15 @@ def process_file(
if stat.S_ISDIR(mode):
log.info("Found directory")
for path in path.iterdir():
process_file(root, path, extract_root, max_depth, current_depth + 1)
process_file(
root,
path,
extract_root,
max_depth,
entropy_depth,
verbose,
current_depth + 1,
)
return

elif stat.S_ISLNK(mode):
Expand All @@ -54,13 +70,29 @@ def process_file(
outer_chunks = remove_inner_chunks(all_chunks)
unknown_chunks = calculate_unknown_chunks(outer_chunks, size)
if not outer_chunks and not unknown_chunks:
# we don't consider whole files as unknown chunks, but we still want to
# calculate entropy for whole files which produced no valid chunks
if current_depth < entropy_depth:
calculate_entropy(path, draw_plot=verbose)
return

extract_dir = make_extract_dir(root, path, extract_root)
carve_unknown_chunks(extract_dir, file, unknown_chunks)
for new_path in extract_valid_chunks(extract_dir, file, outer_chunks):

carved_paths = carve_unknown_chunks(extract_dir, file, unknown_chunks)
if current_depth < entropy_depth:
for carved_path in carved_paths:
calculate_entropy(carved_path, draw_plot=verbose)

for chunk in outer_chunks:
new_path = extract_valid_chunk(extract_dir, file, chunk)
process_file(
extract_root, new_path, extract_root, max_depth, current_depth + 1
extract_root,
new_path,
extract_root,
max_depth,
entropy_depth,
verbose,
current_depth + 1,
)


Expand Down Expand Up @@ -119,3 +151,72 @@ def calculate_unknown_chunks(
unknown_chunks.append(unknown_chunk)

return unknown_chunks


def calculate_entropy(path: Path, *, draw_plot: bool):
"""Calculate and log shannon entropy divided by 8 for the file in 1mB chunks.

Shannon entropy returns the amount of information (in bits) of some numeric
sequence. We calculate the average entropy of byte chunks, which in theory
can contain 0-8 bits of entropy. We normalize it for visualization to a
0-100% scale, to make it easier to interpret the graph.
"""
percentages = []

# We could use the chunk size instead of another syscall,
# but we rely on the actual file size written to the disk
file_size = path.stat().st_size
logger.info("Calculating entropy for file", path=path, size=file_size)

# Smaller chuk size would be very slow to calculate.
# 1Mb chunk size takes ~ 3sec for a 4,5 GB file.
buffer_size = calculate_buffer_size(
file_size, chunk_count=80, min_limit=1024, max_limit=1024 * 1024
)

with path.open("rb") as file:
for chunk in iterate_file(file, 0, file_size, buffer_size=buffer_size):
entropy = shannon_entropy(chunk)
entropy_percentage = round(entropy / 8 * 100, 2)
percentages.append(entropy_percentage)

logger.info(
"Entropy calculated",
mean=round(statistics.mean(percentages), 2),
highest=max(percentages),
lowest=min(percentages),
)

if draw_plot:
draw_entropy_plot(percentages)


def calculate_buffer_size(
file_size, *, chunk_count: int, min_limit: int, max_limit: int
) -> int:
"""Split the file into even sized chunks, limited by lower and upper values."""
# We don't care about floating point precision here
buffer_size = file_size // chunk_count
buffer_size = max(min_limit, buffer_size)
buffer_size = min(buffer_size, max_limit)
return buffer_size


def draw_entropy_plot(percentages: List[float]):
plt.clear_data()
plt.colorless()
plt.title("Entropy distribution")
plt.xlabel("mB")
plt.ylabel("entropy %")

plt.scatter(percentages, marker="dot")
# 16 height leaves no gaps between the lines
plt.plot_size(100, 16)
plt.ylim(0, 100)
# Draw ticks every 1Mb on the x axis.
plt.xticks(range(len(percentages) + 1))
# Always show 0% and 100%
plt.yticks(range(0, 101, 10))

# New line so that chart title will be aligned correctly in the next line
logger.debug("Entropy chart", chart="\n" + plt.build())