diff --git a/README.md b/README.md index 04c1f1f546..4659f008b8 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ unblob is a tool for getting information out of any kind of binary blob. ## Quickstart +### Using from Docker container + Unblob can be used right away from a `docker` container: \ `ghcr.io/iot-inspector/unblob:latest` @@ -15,7 +17,7 @@ docker run \ --rm \ --pull always \ -v /path/to/out/dir/on/host:/data/output \ - -v /path/to/files/on/host:/data/input \ + -v /path/to/input/files/on/host:/data/input \ ghcr.io/iot-inspector/unblob:latest /data/input/path/to/file ``` @@ -24,7 +26,31 @@ Help on usage: docker run --rm --pull always ghcr.io/iot-inspector/unblob:latest --help ``` - +### Using as a library + +```python +from pathlib import Path +import unblob + +files = Path("/path/to/input/file") +# or with multiple files +files = ( + Path("/path/to/input/file/1"), + Path("/path/to/input/file/2"), +) +extract_root = Path("/path/to/out/dir") +# optional parameters +depth = 5 +verbose = True + +return_code = unblob.unblob( + files=files, + extract_root=extract_root, + depth=depth, + verbose=verbose +) +print(f"return_code: {return_code}") +``` ## Development ### Dependencies diff --git a/tests/test_handlers.py b/tests/test_handlers.py index bd3aa495bf..89ec51288d 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -15,9 +15,8 @@ import pytest -from unblob import handlers +from unblob import handlers, unblob from unblob.models import Handler -from unblob.processing import DEFAULT_DEPTH, process_file TEST_DATA_PATH = Path(__file__).parent / "integration" TEST_INPUT_DIRS = list(TEST_DATA_PATH.glob("**/__input__")) @@ -39,12 +38,7 @@ def test_all_handlers(input_dir: Path, output_dir: Path, tmp_path: Path): list(input_dir.iterdir()) != [] ), f"Integration test input dir should contain at least 1 file: {input_dir}" - process_file( - root=input_dir, - path=input_dir, - extract_root=tmp_path, - max_depth=DEFAULT_DEPTH, - ) + unblob(files=input_dir, extract_root=tmp_path, verbose=True) diff_command = [ "diff", diff --git a/unblob/__init__.py b/unblob/__init__.py index e69de29bb2..2d5e7ac322 100644 --- a/unblob/__init__.py +++ b/unblob/__init__.py @@ -0,0 +1,7 @@ +""" +unblob is a tool for getting information out of any kind of binary blob. +""" + +from unblob.cli import unblob + +__all__ = ["unblob"] diff --git a/unblob/cli.py b/unblob/cli.py index c3f3ffbbc9..fd7f3d1485 100644 --- a/unblob/cli.py +++ b/unblob/cli.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 import sys from pathlib import Path -from typing import Tuple +from typing import Tuple, Union import click from structlog import get_logger @@ -35,15 +35,37 @@ help="Recursion depth. How deep should we extract containers.", ) @click.option("-v", "--verbose", is_flag=True, help="Verbose mode, enable debug logs.") -def cli(files: Tuple[Path], extract_root: Path, depth: int, verbose: bool): +def cli(files: Tuple[Path], extract_root: Path, depth: int, verbose: bool) -> int: + return unblob(files=files, extract_root=extract_root, depth=depth, verbose=verbose) + + +def unblob( + files: Union[Path, Tuple[Path]], + extract_root: Path, + depth: int = DEFAULT_DEPTH, + verbose: bool = False, +) -> int: + """Calls unblob for a files. + + Returns with zero for success, non-zero integer in case of error(s). + """ configure_logger(verbose, extract_root) + + if not isinstance(files, Tuple): + files = (files,) logger.info("Start processing files", count=noformat(len(files))) - for path in files: - root = path if path.is_dir() else path.parent - process_file(root, path, extract_root, max_depth=depth) + try: + for path in files: + root = path if path.is_dir() else path.parent + process_file(root, path, extract_root, max_depth=depth) + except Exception: + logger.exception("Unhandled exception during unblob") + return 1 + return exit_code_var.get(0) -def main(): + +def main() -> None: try: # Click argument parsing ctx = cli.make_context("unblob", sys.argv[1:]) @@ -56,14 +78,8 @@ def main(): logger.exception("Unhandled exception during unblob") sys.exit(1) - try: - with ctx: - cli.invoke(ctx) - except Exception: - logger.exception("Unhandled exception during unblob") - sys.exit(1) - - sys.exit(exit_code_var.get(0)) + rc = cli.invoke(ctx) + sys.exit(rc) if __name__ == "__main__": diff --git a/unblob/logging.py b/unblob/logging.py index a55c8efb0f..82bdbcc979 100644 --- a/unblob/logging.py +++ b/unblob/logging.py @@ -53,6 +53,13 @@ def convert_type(logger, method_name: str, event_dict: structlog.types.EventDict def configure_logger(verbose: bool, extract_root: Path): + if structlog.is_configured: + # If used as a library, with already configured structlog, we still need our types to be prettyly printed + processors = structlog.get_config().get("processors", []) + processors.insert(0, pretty_print_types(extract_root)) + structlog.configure(processors=processors) + return + log_level = logging.DEBUG if verbose else logging.INFO processors = [ structlog.stdlib.add_log_level,