Skip to content

Commit

Permalink
Merge pull request #27 from twollnik/multiple_pollutants_per_run
Browse files Browse the repository at this point in the history
Multiple pollutants per run
  • Loading branch information
twollnik authored Jul 4, 2019
2 parents c54d669 + 58c957e commit 2180513
Show file tree
Hide file tree
Showing 39 changed files with 385 additions and 280 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ We have included example configuration files and example data for you to try out
in the folder ``example/``. To run the demo, execute the following command on the command line from the
repository root directory: ``python -m run_yeti -c example/example_configs/copert_hot_config.yaml``. Instead of the
``copert_hot_config.yaml`` you can use any of the
`config files <https://iass-yeti.readthedocs.io/en/latest/user/config.html/>`_ in ``example/example_configs/``.
`config files <https://iass-yeti.readthedocs.io/en/latest/user/config.html>`_ in ``example/example_configs/``.

.. demo-end-do-not-remove
.. usage-start-do-not-remove
Expand Down
6 changes: 3 additions & 3 deletions code/Model.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def run(self, config_dict, config_file):
self.log_step("Loading unified_data.")
unified_data_dataframes = self.load_unified_data(unified_file_locations)

self.log_step(f"Calculating emissions for pollutant {self.config_dict['pollutant']}.")
self.log_step("Calculating emissions.")
self.calculate_emissions_with_user_defined_strategy(unified_data_dataframes)

self.log_step("Creating 'run_info.txt'.")
Expand Down Expand Up @@ -93,15 +93,15 @@ def import_user_defined_functions(self):

def log_run_information(self):

pollutant = self.config_dict["pollutant"]
pollutants = self.config_dict["pollutants"]
strategy = self.config_dict["strategy"]
load_input_data_function = self.config_dict["load_input_data_function"]
load_unified_data_function = self.config_dict["load_unified_data_function"]
validation_function = self.config_dict.get("validation_function")

logging.info(
f"The config file for this run is: {os.path.abspath(self.config_file)}\n"
f"pollutant: {pollutant}\n"
f"pollutants: {pollutants}\n"
f"mode: {self.mode}\n"
f"strategy: {strategy}\n"
f"load_input_data_function: {load_input_data_function}\n"
Expand Down
6 changes: 1 addition & 5 deletions code/StrategyInvoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def calculate_and_save_emissions(self, emissions_output_folder, save_interval_in
self.save_emissions()

self.save_emissions()
return self.output_file

def initialize(self, emissions_output_folder, **kwargs):

Expand Down Expand Up @@ -103,12 +102,9 @@ def initialize_vehicle_dict(self):

def get_output_file_name(self, output_folder):

timestamp = datetime.now().strftime("%Y-%m-%d_%Hh-%Mmin")
output_file = f"{output_folder}/emissions_{self.pollutant}_{timestamp}.csv"

output_file = f"{output_folder}/emissions.csv"
if not os.path.isdir(output_folder):
os.mkdir(output_folder)

return output_file

def traffic_and_link_data_rows(self):
Expand Down
54 changes: 33 additions & 21 deletions code/copert_cold_strategy/CopertColdStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
This module implements emission calculation with the COPERT methodology for cold start emissions.
"""
from collections import defaultdict
from typing import Any, Dict, Iterable, Tuple, Union
from typing import Any, Dict, Iterable, Tuple, Union, List
import collections

import pandas as pd

Expand Down Expand Up @@ -61,10 +61,12 @@ def __init__(self):
self.row = None
self.hot_ef_dict = {}

self.emissions = collections.defaultdict(dict)

def calculate_emissions(self,
traffic_and_link_data_row: Dict[str, Any],
vehicle_dict: Dict[str, str],
pollutant: str,
pollutants: List[str],
**kwargs):
"""
required kwargs:
Expand All @@ -85,19 +87,25 @@ def calculate_emissions(self,

self.initialize_if_necessary(kwargs, vehicle_dict)
self.store_row_data_in_attribute(traffic_and_link_data_row)
self.delete_emissions_from_last_call_to_this_function()

hot_emissions = self.calculate_hot_emissions(pollutant)
hot_ef_dict = self.get_hot_ef_from_hot_emissions(hot_emissions)
hot_emissions = self.calculate_hot_emissions(pollutants)

if self.should_exclude_link_from_cold_emission_calculation(**kwargs) or self.temperature_is_very_high():
cold_emissions = self.zero_emissions_for_all_vehicles()
else:
cold_emissions = self.calculate_cold_emissions(pollutant, hot_ef_dict)
for pollutant in pollutants:

total_emissions = self.calculate_total_emissions(hot_emissions, cold_emissions)
emissions = self.join_emissions_into_one_dict(hot_emissions, cold_emissions, total_emissions)
hot_emissions_for_pollutant = hot_emissions[pollutant]
hot_ef_dict = self.get_hot_ef_from_hot_emissions(hot_emissions_for_pollutant)

if self.should_exclude_link_from_cold_emission_calculation(**kwargs) or self.temperature_is_very_high():
cold_emissions = self.zero_emissions_for_all_vehicles()
else:
cold_emissions = self.calculate_cold_emissions(pollutant, hot_ef_dict)

total_emissions = self.calculate_total_emissions(hot_emissions_for_pollutant, cold_emissions)
self.add_emissions_to_assembly_attribute(pollutant, hot_emissions_for_pollutant, cold_emissions, total_emissions)

return emissions

return self.emissions

def initialize_if_necessary(self, kwargs, vehicle_dict):

Expand Down Expand Up @@ -153,7 +161,7 @@ def initialize_max_min_cold_ef_stats_if_necessary(self):
def initialize_ABC_dict_if_necessary(self, cold_ef_table: pd.DataFrame):

if self.ABC_dict is None:
ABC_dict = defaultdict(list)
ABC_dict = collections.defaultdict(list)
for row in cold_ef_table.fillna(1000).itertuples():
if row.MinTemp <= self.temperature <= row.MaxTemp:
ABC_dict[(row.VehSegment, row.Pollutant)].append(row)
Expand Down Expand Up @@ -207,6 +215,10 @@ def store_row_data_in_attribute(self, row: Dict[str, Any]):

self.row = row

def delete_emissions_from_last_call_to_this_function(self):

self.emissions = collections.defaultdict(dict)

def get_hot_ef_from_hot_emissions(self, hot_emissions: Dict[str, float]) -> Dict[str, float]:
"""Calculate the hot ef using the formula 'hot ef = hot emissions / link length / vehicle count' """

Expand All @@ -221,16 +233,16 @@ def get_hot_ef_from_hot_emissions(self, hot_emissions: Dict[str, float]) -> Dict

return hot_ef

def join_emissions_into_one_dict(
self, hot_emissions: Dict[str, float],
def add_emissions_to_assembly_attribute(
self,
pollutant: str,
hot_emissions: Dict[str, float],
cold_emissions: Dict[str, float],
total_emissions: Dict[str, float]) -> Dict[str, Dict[str, float]]:

return {
"cold": cold_emissions,
"hot": hot_emissions,
"total": total_emissions
}
self.emissions[f"{pollutant}_hot"] = hot_emissions
self.emissions[f"{pollutant}_cold"] = cold_emissions
self.emissions[f"{pollutant}_total"] = total_emissions

def should_exclude_link_from_cold_emission_calculation(self, **kwargs) -> bool:

Expand Down Expand Up @@ -297,7 +309,7 @@ def add_vehicle_to_correct_lcv_group(self, veh_euro: str, veh_fuel: str, veh_nam
else:
self.vehicles_lcv_diesel.append(veh_name)

def calculate_hot_emissions(self, pollutant: str) -> Dict[str, float]:
def calculate_hot_emissions(self, pollutant: str) -> Dict[str, Dict[str, float]]:

return self.hot_strategy.calculate_emissions(
self.row, self.vehicle_dict, pollutant, los_speeds_data=self.los_speeds_data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
- a fixed speed given in the configuration file or
- a fixed speed for each link given in the link data
"""
from typing import Any, Dict
from typing import Any, Dict, List
import collections

from code.copert_hot_strategy.CopertHotStrategy import CopertHotStrategy

Expand Down Expand Up @@ -38,20 +39,21 @@ def __init__(self, **kwargs):
if ef_data is not None:
self.ef_dict = self.get_ef_dict(ef_data)

self.emissions = {}
self.emissions = collections.defaultdict(dict)

def calculate_emissions(self,
traffic_and_link_data_row: Dict[str, Any],
vehicle_dict: Dict[str, str],
pollutant: str,
pollutants: List[str],
**kwargs):

self.initialize_if_necessary(kwargs)
self.delete_emissions_from_last_call_to_this_function()

for vehicle_name, vehicle_category in vehicle_dict.items():
self.calculate_emissions_for_vehicle(
traffic_and_link_data_row, vehicle_name, vehicle_category, pollutant, **kwargs)
for pollutant in pollutants:
for vehicle_name, vehicle_category in vehicle_dict.items():
self.calculate_emissions_for_vehicle(
traffic_and_link_data_row, vehicle_name, vehicle_category, pollutant, **kwargs)

return self.emissions

Expand Down Expand Up @@ -83,4 +85,4 @@ def calculate_emissions_for_vehicle(

emissions = ef * float(traffic_and_link_data_row["Length"]) * float(traffic_and_link_data_row[vehicle_name])

self.emissions[vehicle_name] = emissions
self.emissions[pollutant][vehicle_name] = emissions
18 changes: 10 additions & 8 deletions code/copert_hot_strategy/CopertHotStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
- the los percentages and
- the los speeds attributed to the links.
"""
from typing import Any, Dict
from typing import Any, Dict, List
import collections

import pandas as pd

Expand Down Expand Up @@ -41,20 +42,21 @@ def __init__(self, **kwargs):
self.los_speeds_dict = None
self.ef_dict = None

self.emissions = {}
self.emissions = collections.defaultdict(dict)

def calculate_emissions(self,
traffic_and_link_data_row: Dict[str, Any],
vehicle_dict: Dict[str, str],
pollutant: str,
pollutants: List[str],
**kwargs):

self.initialize_if_necessary(kwargs)
self.delete_emissions_from_last_call_to_this_function()

for vehicle_name, vehicle_category in vehicle_dict.items():
self.calculate_emissions_for_vehicle(
traffic_and_link_data_row, vehicle_name, vehicle_category, pollutant, **kwargs)
for pollutant in pollutants:
for vehicle_name, vehicle_category in vehicle_dict.items():
self.calculate_emissions_for_vehicle(
traffic_and_link_data_row, vehicle_name, vehicle_category, pollutant, **kwargs)

return self.emissions

Expand Down Expand Up @@ -89,7 +91,7 @@ def get_ef_dict(self, ef_data: pd.DataFrame):

def delete_emissions_from_last_call_to_this_function(self):

self.emissions = {}
self.emissions = collections.defaultdict(dict)

def calculate_emissions_for_vehicle(
self, traffic_and_link_data_row, vehicle_name, vehicle_category, pollutant, **kwargs):
Expand All @@ -106,7 +108,7 @@ def calculate_emissions_for_vehicle(
vehicle_count = traffic_and_link_data_row[vehicle_name]
emissions = ef * link_length * vehicle_count

self.emissions[vehicle_name] = emissions
self.emissions[pollutant][vehicle_name] = emissions

def get_ef(self, row: Dict[str, Any], vehicle_name: str, pollutant: str, los_speeds: Dict[str, float]) -> float:

Expand Down
19 changes: 10 additions & 9 deletions code/hbefa_hot_strategy/HbefaHotStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
This Module implements emission calculation with the HBEFA methodology for hot exhaust emissions.
It works with emission factors that are dependent on the vehicle and the traffic situation.
"""
from typing import Any, Dict
from typing import Any, Dict, List
import collections

from code.constants.mappings import ROAD_CAT_FROM_ENUM

Expand Down Expand Up @@ -33,23 +34,23 @@ class HbefaHotStrategy:
def __init__(self, **kwargs):

self.ef_dict = None
self.emissions = {}
self.emissions = collections.defaultdict(dict)

def calculate_emissions(self,
traffic_and_link_data_row: Dict[str, Any],
vehicle_dict: Dict[str, str],
pollutant: str,
pollutants: List[str],
**kwargs):


self.initialize_if_necessary(**kwargs)
self.delete_emissions_from_last_call_to_this_function()

traffic_situation = self.get_traffic_situation(traffic_and_link_data_row)

for vehicle_name, vehicle_category in vehicle_dict.items():
self.calculate_emissions_for_vehicle(
traffic_and_link_data_row, vehicle_name, traffic_situation, pollutant, **kwargs)
for pollutant in pollutants:
for vehicle_name, vehicle_category in vehicle_dict.items():
self.calculate_emissions_for_vehicle(
traffic_and_link_data_row, vehicle_name, traffic_situation, pollutant, **kwargs)

return self.emissions

Expand All @@ -72,7 +73,7 @@ def initialize_ef_data(self, kwargs):

def delete_emissions_from_last_call_to_this_function(self):

self.emissions = {}
self.emissions = collections.defaultdict(dict)

def calculate_emissions_for_vehicle(self, traffic_and_link_data_row, vehicle_name, traffic_situation, pollutant,
**kwargs):
Expand All @@ -91,7 +92,7 @@ def calculate_emissions_for_vehicle(self, traffic_and_link_data_row, vehicle_nam
ef += traffic_and_link_data_row[los_perc_col] * ef_for_los

emissions = ef * traffic_and_link_data_row["Length"] * traffic_and_link_data_row[vehicle_name]
self.emissions[vehicle_name] = emissions
self.emissions[pollutant][vehicle_name] = emissions

def get_traffic_situation(self, row) -> str:

Expand Down
4 changes: 2 additions & 2 deletions code/pm_non_exhaust_strategy/PMNonExhaustStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
This module implements emission calculation for PM from non-exhaust emissions.
Sources for PM non-exhaust emissions are tyre wear, brake wear and road surface emissions.
"""
from typing import Any, Dict, Iterable, Tuple
from typing import Any, Dict, Iterable, Tuple, List


class PMNonExhaustStrategy:
Expand Down Expand Up @@ -50,7 +50,7 @@ def __init__(self):
def calculate_emissions(self,
traffic_and_link_data_row: Dict[str, Any],
vehicle_dict: Dict[str, str],
pollutant: str,
pollutants: List[str],
**kwargs) -> Dict[str, Dict[str, float]]:

self.initialize_if_necessary(vehicle_dict, **kwargs)
Expand Down
14 changes: 4 additions & 10 deletions code/script_helpers/create_info_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,9 @@

def create_info_file(file: str, timestamp, mode, start_time, end_time, config_dict):

info_text = get_info_text_by_mode(mode, timestamp, start_time, end_time, config_dict)
with open(file, "w") as fp:
fp.write(info_text)


def get_info_text_by_mode(mode, timestamp, start_time, end_time, config_dict):

newline = "\n"

pollutant = config_dict["pollutant"]
pollutants = config_dict["pollutants"]
strategy = config_dict["strategy"]
load_input_data_function = config_dict["load_input_data_function"]
load_unified_data_function = config_dict["load_unified_data_function"]
Expand All @@ -28,7 +21,7 @@ def get_info_text_by_mode(mode, timestamp, start_time, end_time, config_dict):
"\n"
f"time of run: {timestamp}\n"
f"duration of run: {(end_time - start_time) / 60} min\n"
f"pollutant: {pollutant}\n"
f"pollutants: {pollutants}\n"
f"output folder: {output_folder}\n"
f"unified data output folder: {config_dict.get('output_folder_for_unified_data')}\n"
f"use nh3 tier 2 ef: {use_nh3_tier2_ef}\n"
Expand Down Expand Up @@ -60,4 +53,5 @@ def get_info_text_by_mode(mode, timestamp, start_time, end_time, config_dict):
else:
raise RuntimeError(f"mode {mode} is not recognized. Please use 'unified_data' or 'input_data'.")

return info_text
with open(file, "w") as fp:
fp.write(info_text)
Loading

0 comments on commit 2180513

Please sign in to comment.