From 05134daf722a93cde72e770381fdceb4fba42f87 Mon Sep 17 00:00:00 2001 From: A Samuel Pottinger Date: Thu, 5 Dec 2024 18:01:58 +0000 Subject: [PATCH] Add preprocess outputs. --- sim_tasks.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/sim_tasks.py b/sim_tasks.py index a087f92b..1b70a799 100644 --- a/sim_tasks.py +++ b/sim_tasks.py @@ -1186,7 +1186,7 @@ def run(self): tasks_with_variations ) outputs_realized = map(lambda x: x.result(), outputs_all) - output_sets_realized.append(list(outputs_realized)) + output_sets_realized.append(self._preprocess_outputs(outputs_realized)) self._process_outputs(output_sets_realized) @@ -1277,6 +1277,17 @@ def get_iterations(self): Number of times to repeat simulations. """ return 1 + + def _preprocess_outputs(self, outputs): + """Preprocess outputs ahead of considering all iterations. + + Args: + outputs: Outputs from the cluster execution. + + Returns: + Collection or iterable of outputs. + """ + return outputs def _process_outputs(self, output_sets_realized): """Process and write outputs for the execution. @@ -1373,13 +1384,6 @@ def _process_outputs(self, output_sets_realized): """ assert len(output_sets_realized) == self.get_iterations() - def simplify_and_combine(outputs_realized): - simplified = map( - lambda x: self._simplify_record(x), - itertools.chain(*outputs_realized) - ) - return functools.reduce(lambda a, b: self._combine(a, b), simplified) - def get_stats(key, summaries): values = [x[key] for x in summaries] return { @@ -1387,10 +1391,9 @@ def get_stats(key, summaries): 'std': statistics.stdev(values) } - summaries = [simplify_and_combine(x) for x in output_sets_realized] - means = get_stats('mean', summaries) - probabilities = get_stats('probability', summaries) - severities = get_stats('severity', summaries) + means = get_stats('mean', output_sets_realized) + probabilities = get_stats('probability', output_sets_realized) + severities = get_stats('severity', output_sets_realized) output = {'mean': means, 'probability': probabilities, 'severity': severities} with self.output().open('w') as f: @@ -1441,6 +1444,21 @@ def get_weighted_avg(a_val, b_val, ignore_zero): 'severity': get_weighted_avg(a['severity'], b['severity'], True) } + def _preprocess_outputs(self, outputs): + """Preprocess outputs ahead of considering all iterations. + + Args: + outputs: Outputs from the cluster execution. + + Returns: + Collection or iterable of outputs. + """ + simplified = map( + lambda x: self._simplify_record(x), + itertools.chain(*outputs) + ) + return functools.reduce(lambda a, b: self._combine(a, b), simplified) + class NoopProjectHistoricTask(luigi.Task): """Task in which historic data are retroactively predicted for reference without model."""