Skip to content

Commit

Permalink
feat: add backend dev docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dPys committed Dec 27, 2024
1 parent a5d6bcb commit 08c3413
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 89 deletions.
105 changes: 105 additions & 0 deletions doc/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,108 @@ docker-compose -f docker/docker-compose.cpu.yaml run --rm nxbench --config 'nxbe
```bash
docker-compose -f docker/docker-compose.cpu.yaml run --rm nxbench --config 'nxbench/configs/example.yaml' benchmark export 'nxbench_results/9e3e8baa4a3443c392dc8fee00373b11_20241220002902.json' --output-format csv --output-file 'nxbench_results/results.csv'
```

## Adding a New Backend

> **Note:** The following guide assumes you have a recent version of NxBench with the new `BackendManager` and associated tools (e.g., [`core.py`](../nxbench/backends/core.py) and [`registry.py`](../nxbench/backends/registry.py)) already in place. It also assumes that your backend follows the [guidelines for developing custom NetworkX backends](https://networkx.org/documentation/stable/reference/backends.html#docs-for-backend-developers)
### 1. Verify Your Backend is Installable

1. **Install** your backend via `pip` (or conda, etc.).
For example, if your backend library is `my_cool_backend`, ensure that:

```bash
pip install my_cool_backend
```

2. **Check import**: NxBench’s detection system simply looks for `importlib.util.find_spec("my_cool_backend")`. So if your library is not found by Python, NxBench will conclude it is unavailable.

### 2. Write a Conversion Function

In NxBench, a “backend” is simply a library or extension that **converts a `networkx.Graph` into an alternate representation**. You must define one or more **conversion** functions:

```python
def convert_my_cool_backend(nx_graph: networkx.Graph, num_threads: int):
import my_cool_backend
# Possibly configure multi-threading if relevant:
# my_cool_backend.configure_threads(num_threads)

# Convert the Nx graph to your library’s internal representation:
return my_cool_backend.from_networkx(nx_graph)
```

## 3. (Optional) Write a Teardown Function

If your backend has special cleanup needs (e.g., free GPU memory, close connections, revert global state, etc.), define a teardown function:

```python
def teardown_my_cool_backend():
import my_cool_backend
# e.g. my_cool_backend.shutdown()
pass
```

If your backend doesn’t need cleanup, skip this or simply define an empty function.

## 4. Register with NxBench

Locate NxBench’s [registry.py](../nxbench/backends/registry.py) (or a similar file where other backends are registered). Add your calls to `backend_manager.register_backend(...)`:

```python
from nxbench.backends.registry import backend_manager
import networkx as nx # only if needed

def convert_my_cool_backend(nx_graph: nx.Graph, num_threads: int):
import my_cool_backend
# Possibly configure my_cool_backend with num_threads
return my_cool_backend.from_networkx(nx_graph)

def teardown_my_cool_backend():
# e.g. release resources
pass

backend_manager.register_backend(
name="my_cool_backend", # The name NxBench will use to refer to it
import_name="my_cool_backend", # The importable Python module name
conversion_func=convert_my_cool_backend,
teardown_func=teardown_my_cool_backend # optional
)
```

**Important**:

- `name` is the “human-readable” alias in NxBench.
- `import_name` is the actual module import path. They can be the same (most common) or different if your library’s PyPI name differs from its Python import path.

## 5. Confirm It Works

1. **Check NxBench logs**: When NxBench runs, it will detect whether `"my_cool_backend"` is installed by calling `importlib.util.find_spec("my_cool_backend")`.
2. **Run a quick benchmark**:

```bash
nxbench --config my_config.yaml benchmark run
```

If you see logs like “Chosen backends: [‘my_cool_backend’ …]” then NxBench recognized your backend. If it fails with “No valid backends found,” ensure your library is installed and spelled correctly.

## 6. (Optional) Version Pinning

If you want NxBench to only run your backend if it matches a pinned version (e.g. `my_cool_backend==2.1.0`), add something like this to your NxBench config YAML:

```yaml
environ:
backend:
my_cool_backend:
- "my_cool_backend==2.1.0"
```
NxBench will:
- Detect the installed version automatically (via `my_cool_backend.**version**` or PyPI metadata)
- Skip running if it doesn’t match `2.1.0`.

---

### That’s it

You’ve successfully added a new backend to NxBench! Now, NxBench can detect it, convert graphs for it, optionally tear it down, and track its version during benchmarking.
13 changes: 13 additions & 0 deletions nxbench/backends/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ def teardown_networkx():
# ---- Nx-Parallel backend ----
def convert_parallel(original_graph: nx.Graph, num_threads: int):
nxp = import_module("nx_parallel")
from joblib import cpu_count

total_cores = cpu_count()

n_jobs = min(num_threads, total_cores)

nx.config.backends.parallel.active = True
nx.config.backends.parallel.n_jobs = n_jobs
nx.config.backends.parallel.backend = "loky"
if hasattr(nx.config.backends.parallel, "inner_max_num_threads"):
nx.config.backends.parallel.inner_max_num_threads = max(
total_cores // n_jobs, 1
)

return nxp.ParallelGraph(original_graph)

Expand Down
50 changes: 32 additions & 18 deletions nxbench/benchmarking/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
"PREFECT_API_DATABASE_CONNECTION_URL",
"postgresql+asyncpg://prefect_user:pass@localhost:5432/prefect_db",
)
os.environ.setdefault("PREFECT_ORION_DATABASE_CONNECTION_POOL_SIZE", "5")
os.environ.setdefault("PREFECT_ORION_DATABASE_CONNECTION_MAX_OVERFLOW", "10")
os.environ.setdefault("PREFECT_ORION_DATABASE_CONNECTION_POOL_SIZE", "10")
os.environ.setdefault("PREFECT_ORION_DATABASE_CONNECTION_MAX_OVERFLOW", "20")
os.environ.setdefault("PREFECT_API_URL", "http://127.0.0.1:4200/api")
os.environ.setdefault("PREFECT_ORION_API_ENABLE_TASK_RUN_DATA_PERSISTENCE", "false")
os.environ.setdefault("PREFECT_CLIENT_REQUEST_TIMEOUT", "60")
os.environ.setdefault("PREFECT_HTTPX_SETTINGS", '{"limits": {"max_connections": 50}')
os.environ.setdefault("MAX_WORKERS", "4")

run_uuid = uuid.uuid4().hex
Expand Down Expand Up @@ -85,53 +87,65 @@ def configure_backend(original_graph: nx.Graph, backend: str, num_thread: int) -
def run_algorithm(
graph: Any, algo_config: AlgorithmConfig, num_thread: int, backend: str
) -> tuple[Any, float, int, str | None]:
"""Run the algorithm on the configured backend"""
"""
Attempt to run the algorithm on the configured backend, but gracefully
skip if that algorithm is not actually implemented for this backend.
"""
logger = get_run_logger()

try:
# retrieve the callable for the chosen backend
algo_func = algo_config.get_callable(backend)
except ImportError as e:
logger.exception(
f"Could not get a callable for {algo_config.name} from {backend}."
)
return None, 0.0, 0, str(e)

# parse and prepare the parameters
pos_args, kwargs = process_algorithm_params(algo_config.params)

kwargs = add_seeding(kwargs, algo_func, algo_config.name)

error = None
original_env = {}
vars_to_set = [
"NUM_THREAD",
"OMP_NUM_THREADS",
"MKL_NUM_THREADS",
"OPENBLAS_NUM_THREADS",
"NUMEXPR_NUM_THREADS",
"VECLIB_MAXIMUM_THREADS",
]
try:
original_env = {}
vars_to_set = [
"NUM_THREAD",
"OMP_NUM_THREADS",
"MKL_NUM_THREADS",
"OPENBLAS_NUM_THREADS",
]
for var_name in vars_to_set:
original_env[var_name] = os.environ.get(var_name)
os.environ[var_name] = str(num_thread)

with memory_tracker() as mem:
start_time = time.perf_counter()
# pass the graph plus the processed pos_args and kwargs
result = algo_func(graph, *pos_args, **kwargs)

try:
result = algo_func(graph, *pos_args, **kwargs)
except NotImplementedError as nie:
logger.info(
f"Skipping {algo_config.name} for backend '{backend}' because "
f"it's not implemented (NotImplementedError)."
)
return None, 0.0, 0, str(nie)

end_time = time.perf_counter()
execution_time = end_time - start_time
peak_memory = mem["peak"]

execution_time = end_time - start_time
peak_memory = mem["peak"]
logger.debug(f"Algorithm '{algo_config.name}' executed successfully.")

except Exception as e:
logger.exception("Algorithm run failed")
execution_time = time.perf_counter() - start_time
peak_memory = mem.get("peak", 0)
result = None
error = str(e)

finally:
# restore environment variables
# restore environment
for var_name in vars_to_set:
if original_env[var_name] is None:
del os.environ[var_name]
Expand Down
Loading

0 comments on commit 08c3413

Please sign in to comment.