Skip to content

Commit

Permalink
Add preprocess outputs.
Browse files Browse the repository at this point in the history
  • Loading branch information
sampottinger committed Dec 5, 2024
1 parent 57bddab commit 05134da
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions sim_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ def run(self):
tasks_with_variations
)
outputs_realized = map(lambda x: x.result(), outputs_all)
output_sets_realized.append(list(outputs_realized))
output_sets_realized.append(self._preprocess_outputs(outputs_realized))

self._process_outputs(output_sets_realized)

Expand Down Expand Up @@ -1277,6 +1277,17 @@ def get_iterations(self):
Number of times to repeat simulations.
"""
return 1

def _preprocess_outputs(self, outputs):
"""Preprocess outputs ahead of considering all iterations.
Args:
outputs: Outputs from the cluster execution.
Returns:
Collection or iterable of outputs.
"""
return outputs

def _process_outputs(self, output_sets_realized):
"""Process and write outputs for the execution.
Expand Down Expand Up @@ -1373,24 +1384,16 @@ def _process_outputs(self, output_sets_realized):
"""
assert len(output_sets_realized) == self.get_iterations()

def simplify_and_combine(outputs_realized):
simplified = map(
lambda x: self._simplify_record(x),
itertools.chain(*outputs_realized)
)
return functools.reduce(lambda a, b: self._combine(a, b), simplified)

def get_stats(key, summaries):
values = [x[key] for x in summaries]
return {
'mean': statistics.mean(values),
'std': statistics.stdev(values)
}

summaries = [simplify_and_combine(x) for x in output_sets_realized]
means = get_stats('mean', summaries)
probabilities = get_stats('probability', summaries)
severities = get_stats('severity', summaries)
means = get_stats('mean', output_sets_realized)
probabilities = get_stats('probability', output_sets_realized)
severities = get_stats('severity', output_sets_realized)
output = {'mean': means, 'probability': probabilities, 'severity': severities}

with self.output().open('w') as f:
Expand Down Expand Up @@ -1441,6 +1444,21 @@ def get_weighted_avg(a_val, b_val, ignore_zero):
'severity': get_weighted_avg(a['severity'], b['severity'], True)
}

def _preprocess_outputs(self, outputs):
"""Preprocess outputs ahead of considering all iterations.
Args:
outputs: Outputs from the cluster execution.
Returns:
Collection or iterable of outputs.
"""
simplified = map(
lambda x: self._simplify_record(x),
itertools.chain(*outputs)
)
return functools.reduce(lambda a, b: self._combine(a, b), simplified)


class NoopProjectHistoricTask(luigi.Task):
"""Task in which historic data are retroactively predicted for reference without model."""
Expand Down

0 comments on commit 05134da

Please sign in to comment.