Skip to content

Commit

Permalink
Merge pull request #512 from Neuraxio/add-windows-compatibility
Browse files Browse the repository at this point in the history
Add Windows support + do some cleaning
  • Loading branch information
guillaume-chevalier authored Oct 15, 2021
2 parents 1148253 + bf0f14d commit e9cc89d
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 360 deletions.
84 changes: 52 additions & 32 deletions examples/parallel/plot_streaming_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,27 @@
===================================================================
This demonstrates how to stream data in parallel in a Neuraxle pipeline.
The pipeline steps' parallelism here will be obvious.
The pipeline has two steps:
1. Preprocessing: the step that process the data simply sleeps.
2. Model: the model simply multiplies the data by two.
This can be used with scikit-learn as well to transform things in parallel,
and any other library such as tensorflow.
Pipelines benchmarked:
1. We first use a classical pipeline and evaluate the time.
2. Then we use a minibatched pipeline and we evaluate the time.
3. Then we use a parallel pipeline and we evaluate the time.
We expect the parallel pipeline to be faster due to having more workers
in parallel, as well as starting the model's transformations at the same
time that other batches are being preprocessed, using queues.
..
Copyright 2019, Neuraxio Inc.
Copyright 2021, Neuraxio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -25,47 +43,49 @@
import numpy as np

from neuraxle.distributed.streaming import SequentialQueuedPipeline
from neuraxle.pipeline import Pipeline
from neuraxle.pipeline import BasePipeline, Pipeline, MiniBatchSequentialPipeline
from neuraxle.steps.loop import ForEach
from neuraxle.steps.misc import Sleep
from neuraxle.steps.numpy import MultiplyByN


def main():
"""
Process tasks of batch size 10 with 8 queued workers that have a max queue size of 10.
Each task doest the following: For each data input, sleep 0.02 seconds, and multiply by 2.
"""
sleep_time = 0.02
p = SequentialQueuedPipeline([
Pipeline([ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]),
], n_workers_per_step=8, max_queue_size=10, batch_size=10)

def eval_run_time(pipeline: BasePipeline):
pipeline.setup()
a = time.time()
outputs_streaming = p.transform(list(range(100)))
output = pipeline.transform(list(range(100)))
b = time.time()
time_queued_pipeline = b - a
print('SequentialQueuedPipeline')
print('execution time: {} seconds'.format(time_queued_pipeline))
seconds = b - a
return seconds, output


def main():
"""
Process data inputs sequentially.
For each data input, sleep 0.02 seconds, and then multiply by 2.
The task is to sleep 0.02 seconds for each data input and then multiply by 2.
"""
p = Pipeline([
Pipeline([ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]),
])

a = time.time()
outputs_vanilla = p.transform(list(range(100)))
b = time.time()
time_vanilla_pipeline = b - a

print('VanillaPipeline')
print('execution time: {} seconds'.format(time_vanilla_pipeline))

assert time_queued_pipeline < time_vanilla_pipeline
assert np.array_equal(outputs_streaming, outputs_vanilla)
sleep_time = 0.02
preprocessing_and_model_steps = [ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]

# Classical pipeline - all at once with one big batch:
p = Pipeline(preprocessing_and_model_steps)
time_vanilla_pipeline, output_classical = eval_run_time(p)
print(f"Classical 'Pipeline' execution time: {time_vanilla_pipeline} seconds.")

# Classical minibatch pipeline - minibatch size 10:
p = MiniBatchSequentialPipeline(preprocessing_and_model_steps,
batch_size=10)
time_minibatch_pipeline, output_minibatch = eval_run_time(p)
print(f"Minibatched 'MiniBatchSequentialPipeline' execution time: {time_minibatch_pipeline} seconds.")

# Parallel pipeline - minibatch size 10 with 8 workers per step that
# have a max queue size of 5 batches between preprocessing and the model:
p = SequentialQueuedPipeline(preprocessing_and_model_steps,
n_workers_per_step=8, max_queue_size=5, batch_size=10)
time_parallel_pipeline, output_parallel = eval_run_time(p)
print(f"Parallel 'SequentialQueuedPipeline' execution time: {time_parallel_pipeline} seconds.")

assert time_parallel_pipeline < time_minibatch_pipeline, str((time_parallel_pipeline, time_vanilla_pipeline))
assert np.array_equal(output_classical, output_minibatch)
assert np.array_equal(output_classical, output_parallel)


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion neuraxle/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2242,7 +2242,7 @@ def _teardown(self) -> 'BaseTransformer':
:return: self
"""
self.is_initialized = False
return self
return RecursiveDict()

def __del__(self):
try:
Expand Down
21 changes: 10 additions & 11 deletions neuraxle/distributed/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ObservableQueueMixin(MixinForBaseTransformer):
:class:`SequentialQueuedPipeline`
"""

def __init__(self, queue):
def __init__(self, queue: Queue):
MixinForBaseTransformer.__init__(self)
self.queue = queue
self.observers = []
Expand Down Expand Up @@ -176,7 +176,7 @@ def __init__(
additional_worker_arguments = [[] for _ in range(n_workers)]

MetaStep.__init__(self, wrapped)
ObservableQueueMixin.__init__(self, Queue(maxsize=max_queue_size))
ObservableQueueMixin.__init__(self, Queue(maxsize=max_queue_size)) # max_queue_size is in batches

self.use_processes: bool = use_processes
self.workers: List[Process] = []
Expand Down Expand Up @@ -345,17 +345,16 @@ class BaseQueuedPipeline(MiniBatchSequentialPipeline):
], batch_size=10, max_queue_size=5)
outputs = p.transform(list(range(100)))
:param steps: pipeline steps
:param batch_size: number of elements to combine into a single batch
:param n_workers_per_step: number of workers to spawn per step
:param max_queue_size: max number of elements inside the processing queue
:param data_joiner: transformer step to join streamed batches together at the end of the pipeline
:param steps: pipeline steps.
:param batch_size: number of elements to combine into a single batch.
:param n_workers_per_step: number of workers to spawn per step.
:param max_queue_size: max number of batches inside the processing queue between the workers.
:param data_joiner: transformer step to join streamed batches together at the end of the pipeline.
:param use_processes: use processes instead of threads for parallel processing. multiprocessing.context.Process is used by default.
:param use_savers: use savers to serialize steps for parallel processing. Recommended if using processes instead of threads.
:param keep_incomplete_batch: (Optional.) A bool representing
whether the last batch should be dropped in the case it has fewer than
`batch_size` elements; the default behavior is to keep the smaller
batch.
:param keep_incomplete_batch: (Optional.) A bool that indicates whether
or not the last batch should be dropped in the case it has fewer than
`batch_size` elements; the default behavior is to keep the smaller batch.
:param default_value_data_inputs: expected_outputs default fill value
for padding and values outside iteration range, or :class:`~neuraxle.data_container.DataContainer.AbsentValuesNullObject`
to trim absent values from the batch
Expand Down
63 changes: 30 additions & 33 deletions neuraxle/metaopt/auto_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@

import numpy as np

from neuraxle.base import BaseStep, ExecutionContext, ForceHandleMixin, ExecutionPhase, _HasChildrenMixin, \
LOGGER_FORMAT, DATE_FORMAT
from neuraxle.base import BaseStep, ExecutionContext, ForceHandleMixin, ExecutionPhase, _HasChildrenMixin
from neuraxle.data_container import DataContainer
from neuraxle.hyperparams.space import HyperparameterSamples, HyperparameterSpace
from neuraxle.metaopt.callbacks import BaseCallback, CallbackList, ScoringCallback
Expand All @@ -54,6 +53,7 @@
class HyperparamsRepository(_Observable[Tuple['HyperparamsRepository', Trial]], ABC):
"""
Hyperparams repository that saves hyperparams, and scores for every AutoML trial.
Cache folder can be changed to do different round numbers.
.. seealso::
:class:`AutoML`,
Expand All @@ -66,10 +66,15 @@ class HyperparamsRepository(_Observable[Tuple['HyperparamsRepository', Trial]],
:class:`~neuraxle.hyperparams.space.HyperparameterSamples`
"""

def __init__(self, hyperparameter_selection_strategy=None, cache_folder=None, best_retrained_model_folder=None):
def __init__(
self,
hyperparameter_selection_strategy: 'BaseHyperparameterSelectionStrategy' = None,
cache_folder: str = None,
best_retrained_model_folder: str = None,
):
super().__init__()
if cache_folder is None:
cache_folder = 'trials'
cache_folder = os.path.join(f'{self.__class__.__name__}', 'trials')
if best_retrained_model_folder is None:
best_retrained_model_folder = os.path.join(cache_folder, 'best')
self.best_retrained_model_folder = best_retrained_model_folder
Expand Down Expand Up @@ -155,7 +160,7 @@ def save_best_model(self, step: BaseStep):
self._save_best_model(step, trial_hash)
return step

def new_trial(self, auto_ml_container: 'AutoMLContainer'):
def new_trial(self, auto_ml_container: 'AutoMLContainer') -> Trial:
"""
Create a new trial with the best next hyperparams.
Expand All @@ -164,19 +169,16 @@ def new_trial(self, auto_ml_container: 'AutoMLContainer'):
:return: trial
"""
hyperparams = self.hyperparameter_selection_strategy.find_next_best_hyperparams(auto_ml_container)
logger = self._create_logger_for_trial(auto_ml_container.trial_number)
logger.info('\nnew trial: {}'.format(json.dumps(hyperparams.to_nested_dict(), sort_keys=True, indent=4)))

trial = Trial(
trial_number=auto_ml_container.trial_number,
hyperparams=hyperparams,
save_trial_function=self.save_trial,
logger=logger,
cache_folder=self.cache_folder,
main_metric_name=auto_ml_container.main_scoring_metric_name
)
return trial


def _get_trial_hash(self, hp_dict):
"""
Hash hyperparams with md5 to create a trial hash.
Expand All @@ -187,19 +189,6 @@ def _get_trial_hash(self, hp_dict):
current_hyperparameters_hash = hashlib.md5(str.encode(str(hp_dict))).hexdigest()
return current_hyperparameters_hash

def _create_logger_for_trial(self, trial_number) -> logging.Logger:

os.makedirs(self.cache_folder, exist_ok=True)

logfile_path = os.path.join(self.cache_folder, f"trial_{trial_number}.log")
logger_name = f"trial_{trial_number}"
logger = logging.getLogger(logger_name)
formatter = logging.Formatter(fmt=LOGGER_FORMAT, datefmt=DATE_FORMAT)
file_handler = logging.FileHandler(filename=logfile_path)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger


class InMemoryHyperparamsRepository(HyperparamsRepository):
"""
Expand Down Expand Up @@ -329,14 +318,14 @@ def _save_trial(self, trial: 'Trial'):
# Sleeping to have a valid time difference between files when reloading them to sort them by creation time:
time.sleep(0.1)

def new_trial(self, auto_ml_container: 'AutoMLContainer'):
def new_trial(self, auto_ml_container: 'AutoMLContainer') -> Trial:
"""
Create new hyperperams trial json file.
:param auto_ml_container: auto ml container
:return:
"""
trial = HyperparamsRepository.new_trial(self, auto_ml_container)
trial: Trial = HyperparamsRepository.new_trial(self, auto_ml_container)
self._save_trial(trial)

return trial
Expand All @@ -346,6 +335,7 @@ def load_all_trials(self, status: 'TRIAL_STATUS' = None) -> 'Trials':
Load all hyperparameter trials with their corresponding score.
Reads all the saved trial json files, sorted by creation date.
:param status: (optional) filter to select only trials with this status.
:return: (hyperparams, scores)
"""
trials = Trials()
Expand All @@ -370,7 +360,8 @@ def getmtimens(filename):
if status is None or trial_json['status'] == status.value:
trials.append(Trial.from_json(
update_trial_function=self.save_trial,
trial_json=trial_json
trial_json=trial_json,
cache_folder=self.cache_folder
))

return trials
Expand Down Expand Up @@ -498,7 +489,14 @@ def __init__(
hyperparams_repository = InMemoryHyperparamsRepository()
self.hyperparams_repository: HyperparamsRepository = hyperparams_repository

def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context: ExecutionContext = None) -> Trial:
def train(
self,
pipeline: BaseStep,
data_inputs,
expected_outputs=None,
context: ExecutionContext = None,
trial_number=0
) -> Trial:
"""
Train pipeline using the validation splitter.
Track training, and validation metrics for each epoch.
Expand All @@ -523,12 +521,12 @@ def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context:
logger=context.logger,
hyperparams=pipeline.get_hyperparams(),
main_metric_name=self.get_main_metric_name(),
save_trial_function=self.hyperparams_repository.save_trial
save_trial_function=self.hyperparams_repository.save_trial,
trial_number=trial_number
)

self.execute_trial(
pipeline=pipeline,
trial_number=1,
repo_trial=repo_trial,
context=context,
validation_splits=validation_splits,
Expand All @@ -541,7 +539,6 @@ def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context:
def execute_trial(
self,
pipeline: BaseStep,
trial_number: int,
repo_trial: Trial,
context: ExecutionContext,
validation_splits: List[Tuple[DataContainer, DataContainer]],
Expand Down Expand Up @@ -576,7 +573,7 @@ def execute_trial(
repo_trial=repo_trial,
repo_trial_split_number=repo_trial_split.split_number,
validation_splits=validation_splits,
trial_number=trial_number,
trial_number=repo_trial.trial_number,
n_trial=n_trial
)

Expand Down Expand Up @@ -867,7 +864,6 @@ def _attempt_trial(self, trial_number, validation_splits, context: ExecutionCont
repo_trial_split = self.trainer.execute_trial(
pipeline=self.pipeline,
context=context,
trial_number=trial_number,
repo_trial=repo_trial,
validation_splits=validation_splits,
n_trial=self.n_trial
Expand Down Expand Up @@ -1153,8 +1149,9 @@ class ValidationSplitter(BaseValidationSplitter):
def __init__(self, test_size: float):
self.test_size = test_size

def split(self, data_inputs, current_ids=None, expected_outputs=None, context: ExecutionContext = None) -> Tuple[
List, List, List, List]:
def split(
self, data_inputs, current_ids=None, expected_outputs=None, context: ExecutionContext = None
) -> Tuple[List, List, List, List]:
train_data_inputs, train_expected_outputs, train_current_ids, validation_data_inputs, validation_expected_outputs, validation_current_ids = validation_split(
test_size=self.test_size,
data_inputs=data_inputs,
Expand Down
Loading

0 comments on commit e9cc89d

Please sign in to comment.