Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make usable as library #116

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ unblob is a tool for getting information out of any kind of binary blob.

## Quickstart

### Using from Docker container

Unblob can be used right away from a `docker` container: \
`ghcr.io/iot-inspector/unblob:latest`

Expand All @@ -15,7 +17,7 @@ docker run \
--rm \
--pull always \
-v /path/to/out/dir/on/host:/data/output \
-v /path/to/files/on/host:/data/input \
-v /path/to/input/files/on/host:/data/input \
ghcr.io/iot-inspector/unblob:latest /data/input/path/to/file
```

Expand All @@ -24,7 +26,31 @@ Help on usage:
docker run --rm --pull always ghcr.io/iot-inspector/unblob:latest --help
```


### Using as a library

```python
from pathlib import Path
import unblob

files = Path("/path/to/input/file")
# or with multiple files
files = (
Path("/path/to/input/file/1"),
Path("/path/to/input/file/2"),
)
extract_root = Path("/path/to/out/dir")
# optional parameters
depth = 5
verbose = True

return_code = unblob.unblob(
files=files,
extract_root=extract_root,
depth=depth,
verbose=verbose
)
print(f"return_code: {return_code}")
```
## Development

### Dependencies
Expand Down
10 changes: 2 additions & 8 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

import pytest

from unblob import handlers
from unblob import handlers, unblob
from unblob.models import Handler
from unblob.processing import DEFAULT_DEPTH, process_file

TEST_DATA_PATH = Path(__file__).parent / "integration"
TEST_INPUT_DIRS = list(TEST_DATA_PATH.glob("**/__input__"))
Expand All @@ -39,12 +38,7 @@ def test_all_handlers(input_dir: Path, output_dir: Path, tmp_path: Path):
list(input_dir.iterdir()) != []
), f"Integration test input dir should contain at least 1 file: {input_dir}"

process_file(
root=input_dir,
path=input_dir,
extract_root=tmp_path,
max_depth=DEFAULT_DEPTH,
)
unblob(files=input_dir, extract_root=tmp_path, verbose=True)

diff_command = [
"diff",
Expand Down
7 changes: 7 additions & 0 deletions unblob/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
unblob is a tool for getting information out of any kind of binary blob.
"""

from unblob.cli import unblob

__all__ = ["unblob"]
44 changes: 30 additions & 14 deletions unblob/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
import sys
from pathlib import Path
from typing import Tuple
from typing import Tuple, Union

import click
from structlog import get_logger
Expand Down Expand Up @@ -35,15 +35,37 @@
help="Recursion depth. How deep should we extract containers.",
)
@click.option("-v", "--verbose", is_flag=True, help="Verbose mode, enable debug logs.")
def cli(files: Tuple[Path], extract_root: Path, depth: int, verbose: bool):
def cli(files: Tuple[Path], extract_root: Path, depth: int, verbose: bool) -> int:
return unblob(files=files, extract_root=extract_root, depth=depth, verbose=verbose)


def unblob(
files: Union[Path, Tuple[Path]],
extract_root: Path,
depth: int = DEFAULT_DEPTH,
verbose: bool = False,
) -> int:
"""Calls unblob for a files.

Returns with zero for success, non-zero integer in case of error(s).
"""
configure_logger(verbose, extract_root)

if not isinstance(files, Tuple):
files = (files,)
logger.info("Start processing files", count=noformat(len(files)))
for path in files:
root = path if path.is_dir() else path.parent
process_file(root, path, extract_root, max_depth=depth)
try:
for path in files:
root = path if path.is_dir() else path.parent
process_file(root, path, extract_root, max_depth=depth)
except Exception:
logger.exception("Unhandled exception during unblob")
return 1

return exit_code_var.get(0)

def main():

def main() -> None:
try:
# Click argument parsing
ctx = cli.make_context("unblob", sys.argv[1:])
Expand All @@ -56,14 +78,8 @@ def main():
logger.exception("Unhandled exception during unblob")
sys.exit(1)

try:
with ctx:
cli.invoke(ctx)
except Exception:
logger.exception("Unhandled exception during unblob")
sys.exit(1)

sys.exit(exit_code_var.get(0))
rc = cli.invoke(ctx)
sys.exit(rc)


if __name__ == "__main__":
Expand Down
7 changes: 7 additions & 0 deletions unblob/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def convert_type(logger, method_name: str, event_dict: structlog.types.EventDict


def configure_logger(verbose: bool, extract_root: Path):
if structlog.is_configured:
# If used as a library, with already configured structlog, we still need our types to be prettyly printed
processors = structlog.get_config().get("processors", [])
processors.insert(0, pretty_print_types(extract_root))
structlog.configure(processors=processors)
return

log_level = logging.DEBUG if verbose else logging.INFO
processors = [
structlog.stdlib.add_log_level,
Expand Down