Skip to content

Commit

Permalink
ENH: Rewrite JoblibParallelization following the discussion at anyopt…
Browse files Browse the repository at this point in the history
  • Loading branch information
Taher Chegini committed Jan 10, 2024
1 parent f49596f commit a307d7b
Showing 1 changed file with 190 additions and 7 deletions.
197 changes: 190 additions & 7 deletions pymoo/core/problem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from __future__ import annotations

from abc import abstractmethod
from pathlib import Path
from typing import Any, Callable, Generator, Iterable, Literal

import numpy as np

Expand All @@ -8,8 +12,10 @@

try:
import ray
import joblib
except ImportError:
ray = None
joblib = None


class ElementwiseEvaluationFunction:
Expand Down Expand Up @@ -64,19 +70,196 @@ def __getstate__(self):


class JoblibParallelization:
"""Parallelization using joblib.
Parameters
----------
n_jobs: int, default: -1
The maximum number of concurrently running jobs, such as the number
of Python worker processes when backend="multiprocessing"
or the size of the thread-pool when backend="threading".
If -1 all CPUs are used.
If 1 is given, no parallel computing code is used at all, and the
behavior amounts to a simple python ``for`` loop. This mode is not
compatible with ``timeout``.
For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for
n_jobs = -2, all CPUs but one are used.
None is a marker for 'unset' that will be interpreted as n_jobs=1
unless the call is performed under a :func:`~parallel_config`
context manager that sets another value for ``n_jobs``.
backend: str, default: 'loky'
Specify the parallelization backend implementation.
Supported backends are:
- "loky" used by default, can induce some
communication and memory overhead when exchanging input and
output data with the worker Python processes. On some rare
systems (such as Pyiodide), the loky backend may not be
available.
- "multiprocessing" previous process-based backend based on
``multiprocessing.Pool``. Less robust than ``loky``.
- "threading" is a very low-overhead backend but it suffers
from the Python Global Interpreter Lock if the called function
relies a lot on Python objects. "threading" is mostly useful
when the execution bottleneck is a compiled extension that
explicitly releases the GIL (for instance a Cython loop wrapped
in a "with nogil" block or an expensive call to a library such
as NumPy).
It is not recommended to hard-code the backend name in a call to
:class:`~Parallel` in a library. Instead it is recommended to set
soft hints (prefer) or hard constraints (require) so as to make it
possible for library users to change the backend from the outside
using the :func:`~parallel_config` context manager.
return_as: {'list', 'generator'}, default: 'list'
If 'list', calls to this instance will return a list, only when
all results have been processed and retrieved.
If 'generator', it will return a generator that yields the results
as soon as they are available, in the order the tasks have been
submitted with.
Future releases are planned to also support 'generator_unordered',
in which case the generator immediately yields available results
independently of the submission order.
prefer: {'processes', 'threads'} or None, default: None
Soft hint to choose the default backend if no specific backend
was selected with the :func:`~parallel_config` context manager.
The default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the ``backend``
parameter is specified.
require: 'sharedmem' or None, default: None
Hard constraint to select the backend. If set to 'sharedmem',
the selected backend will be single-host and thread-based even
if the user asked for a non-thread based backend with
:func:`~joblib.parallel_config`.
verbose: int, optional, default: 0
The verbosity level: if non zero, progress messages are
printed. Above 50, the output is sent to stdout.
The frequency of the messages increases with the verbosity level.
If it more than 10, all iterations are reported.
timeout: float, optional, default: None
Timeout limit for each task to complete. If any task takes longer
a TimeOutError will be raised. Only applied when n_jobs != 1
pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}, default: '2*n_jobs'
The number of batches (of tasks) to be pre-dispatched.
Default is '2*n_jobs'. When batch_size="auto" this is reasonable
default and the workers should never starve. Note that only basic
arithmetic are allowed here and no modules can be used in this
expression.
batch_size: int or 'auto', default: 'auto'
The number of atomic tasks to dispatch at once to each
worker. When individual evaluations are very fast, dispatching
calls to workers can be slower than sequential computation because
of the overhead. Batching fast computations together can mitigate
this.
The ``'auto'`` strategy keeps track of the time it takes for a
batch to complete, and dynamically adjusts the batch size to keep
the time on the order of half a second, using a heuristic. The
initial batch size is 1.
``batch_size="auto"`` with ``backend="threading"`` will dispatch
batches of a single task at a time as the threading backend has
very little overhead and using larger batch size has not proved to
bring any gain in that case.
temp_folder: str, optional, default: None
Folder to be used by the pool for memmapping large arrays
for sharing memory with worker processes. If None, this will try in
order:
- a folder pointed by the JOBLIB_TEMP_FOLDER environment
variable,
- /dev/shm if the folder exists and is writable: this is a
RAM disk filesystem available by default on modern Linux
distributions,
- the default system temporary folder that can be
overridden with TMP, TMPDIR or TEMP environment
variables, typically /tmp under Unix operating systems.
Only active when backend="loky" or "multiprocessing".
max_nbytes int, str, or None, optional, default: '1M'
Threshold on the size of arrays passed to the workers that
triggers automated memory mapping in temp_folder. Can be an int
in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
Use None to disable memmapping of large arrays.
Only active when backend="loky" or "multiprocessing".
mmap_mode: {'r+', 'r', 'w+', 'c'} or None, default: 'r'
Memmapping mode for numpy arrays passed to workers. None will
disable memmapping, other modes defined in the numpy.memmap doc:
https://numpy.org/doc/stable/reference/generated/numpy.memmap.html
Also, see 'max_nbytes' parameter documentation for more details.
"""

def __init__(self, aJoblibParallel, aJoblibDelayed, *args, **kwargs) -> None:
def __init__(
self,
n_jobs: int = -1,
backend: Literal["loky", "threading", "multiprocessing"] = "loky",
return_as: Literal["list", "generator"] = "list",
verbose: int = 0,
timeout: float | None = None,
pre_dispatch: str | int = "2 * n_jobs",
batch_size: int | Literal["auto"] = "auto",
temp_folder: str | Path | None = None,
max_nbytes: int | str | None = "1M",
mmap_mode: Literal["r+", "r", "w+", "c"] | None = "r",
prefer: Literal["processes", "threads"] | None = None,
require: Literal["sharedmem"] | None = None,
*args: Any,
**kwargs: Any,
) -> None:
if joblib is None:
msg = (
"joblib must be installed! "
"You can install joblib with the command: "
'`pip install -U joblib psutil`'
)
raise ImportError(msg)
self.n_jobs = n_jobs
self.backend = backend
self.return_as = return_as
self.verbose = verbose
self.timeout = timeout
self.pre_dispatch = pre_dispatch
self.batch_size = batch_size
self.temp_folder = temp_folder
self.max_nbytes = max_nbytes
self.mmap_mode = mmap_mode
self.prefer = prefer
self.require = require
super().__init__()
self.parallel = aJoblibParallel
self.delayed = aJoblibDelayed

def __call__(self, f, X):
return self.parallel(self.delayed(f)(x) for x in X)
def __call__(
self,
f: Callable[..., Any],
X: Iterable[Any],
) -> list[Any] | Generator[Any, Any, None]:
with joblib.Parallel(
n_jobs=self.n_jobs,
backend=self.backend,
return_as=self.return_as,
verbose=self.verbose,
timeout=self.timeout,
pre_dispatch=self.pre_dispatch,
batch_size=self.batch_size,
temp_folder=self.temp_folder,
max_nbytes=self.max_nbytes,
mmap_mode=self.mmap_mode,
prefer=self.prefer,
require=self.require,
) as parallel:
return parallel(joblib.delayed(f)(x) for x in X)

def __getstate__(self):
state = self.__dict__.copy()
state.pop("parallel", None)
state.pop("delayed", None)
state.pop("n_jobs", None)
state.pop("backend", None)
state.pop("return_as", None)
state.pop("verbose", None)
state.pop("timeout", None)
state.pop("pre_dispatch", None)
state.pop("batch_size", None)
state.pop("temp_folder", None)
state.pop("max_nbytes", None)
state.pop("mmap_mode", None)
state.pop("prefer", None)
state.pop("require", None)
return state


Expand Down

0 comments on commit a307d7b

Please sign in to comment.