Skip to content

Commit

Permalink
Merge pull request #25 from sedos-project/release/v0.12.0
Browse files Browse the repository at this point in the history
Release/v0.12.0
  • Loading branch information
henhuy authored Nov 2, 2023
2 parents 2e81c2a + 991b996 commit 34928d6
Show file tree
Hide file tree
Showing 18 changed files with 1,490 additions and 1,264 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Here is a template for new release sections
-
```

## [0.12.0] - 2023-11-02
### Changed
- foreign keys are given in datasets instead of links

## [0.11.2] - 2023-08-17
### Fixed
- timeseries refactoring for multiple periods
Expand Down
2 changes: 1 addition & 1 deletion data_adapter/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.11.2"
version = "0.12.0"
9 changes: 7 additions & 2 deletions data_adapter/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ def get_collection_meta(collection: str) -> dict:
return metadata


def get_artifacts_from_collection(collection: str, process: Optional[str] = None) -> list[Artifact]:
def get_artifacts_from_collection(
collection: str, process: Optional[str] = None, use_annotation: Optional[bool] = None
) -> list[Artifact]:
"""Returns list of artifacts belonging to given process (subject).
Parameters
Expand All @@ -190,17 +192,20 @@ def get_artifacts_from_collection(collection: str, process: Optional[str] = None
Collection name
process : Optional[str]
Name of process to search collection metadata for. If not set, all artifacts will be returned.
use_annotation: Optional[bool]
If set annotations are used or not used depending on value, otherwise settings value USE_ANNOTATIONS is used.
Returns
-------
List[ArtifactPath]
List of artifacts in collection (belonging to given process, if set)
"""
use_annotation = settings.USE_ANNOTATIONS if use_annotation is None else use_annotation
collection_meta = get_collection_meta(collection)
artifacts = []
for group in collection_meta["artifacts"]:
for artifact, artifact_info in collection_meta["artifacts"][group].items():
process_names = artifact_info["subjects"] if settings.USE_ANNOTATIONS else artifact_info["names"]
process_names = artifact_info["subjects"] if use_annotation else artifact_info["names"]
if process and process not in process_names:
continue
filename = artifact
Expand Down
87 changes: 60 additions & 27 deletions data_adapter/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import math
import pathlib
import warnings
from collections import ChainMap
from collections import ChainMap, namedtuple
from dataclasses import dataclass
from typing import Iterable, Optional

Expand All @@ -20,6 +20,8 @@
"timeindex_resolution",
]

ForeignKey = namedtuple("ForeignKey", ("process", "parameter"))


@dataclass
class Process:
Expand All @@ -34,9 +36,7 @@ class PreprocessingError(Exception):


class Adapter:
def __init__(
self, collection_name: str, structure_name: Optional[str] = None, links_name: Optional[str] = None
) -> None:
def __init__(self, collection_name: str, structure_name: Optional[str] = None) -> None:
"""The adapter is used to handle collection, structure and links centralized.
Parameters
Expand All @@ -45,12 +45,9 @@ def __init__(
Name of collection from collection folder to get data from
structure_name : Optional[str]
Name of structure in structure folder to read energysystem structure from
links_name : Optional[str]
Name of links in structure folder to read additional links for processes
"""
self.collection_name = collection_name
self.structure_name = structure_name
self.links_name = links_name

def get_process(self, process: str) -> Process:
"""Loads data for given process from collection.
Expand Down Expand Up @@ -91,26 +88,31 @@ def get_process(self, process: str) -> Process:
for artifact in artifacts:
df = self.__get_df_from_artifact(artifact, process)
if artifact.datatype == collection.DataType.Scalar:
# Handle foreign keys (only possible in scalar data)
foreign_keys = self._get_foreign_keys(process, df)
for fk_column, foreign_key in foreign_keys.items():
artifacts = collection.get_artifacts_from_collection(
self.collection_name, foreign_key.process, use_annotation=False
)
if not artifacts:
continue # no candidate
if len(artifacts) > 1:
raise structure.StructureError(
f"Foreign key for process '{process}' points to subject '{foreign_key.process}' "
"which is not unique.",
)
foreign_df = self.__get_df_from_artifact(artifacts[0], foreign_key.process, foreign_key.parameter)
foreign_df = foreign_df.rename({foreign_key.parameter: fk_column}, axis=1)
if artifacts[0].datatype == collection.DataType.Scalar:
scalar_dfs.append(foreign_df)
else:
timeseries_df.append(foreign_df)
# Remove FK column from original process
df = df.drop(fk_column, axis=1)
scalar_dfs.append(df)
else:
timeseries_df.append(df)

# Get dataframes for processes from additional parameters
if self.links_name:
for subject, parameters in structure.get_links_for_process(process, links_name=self.links_name).items():
artifacts = collection.get_artifacts_from_collection(self.collection_name, subject)
if not artifacts:
raise structure.StructureError(f"Could not find linked parameter for {process=} and {subject=}.")
if len(artifacts) > 1:
raise structure.StructureError(
f"Linked parameter for process '{process}' points to subject '{subject}' which is not unique.",
)
df = self.__get_df_from_artifact(artifacts[0], subject, *parameters)
if artifacts[0].datatype == collection.DataType.Scalar:
scalar_dfs.append(df)
else:
timeseries_df.append(df)

return Process(
self.__merge_parameters(*scalar_dfs, datatype=collection.DataType.Scalar),
self.__refactor_timeseries(
Expand Down Expand Up @@ -329,8 +331,41 @@ def __refactor_timeseries(timeseries_raw: pd.DataFrame) -> pd.DataFrame:
merged_timeseries.columns.names = ("name", "region")
return merged_timeseries

@staticmethod
def _get_foreign_keys(process: str, df: pd.DataFrame) -> dict[str, ForeignKey]:
"""
Detect and check foreign keys in scalar data and return related columns and references
Parameters
----------
process: str
Name of process
df
Process data in scalar format (only scalar data holds FKs)
def get_process(collection_name: str, process: str, links: str) -> Process:
Returns
-------
Dict of columns which hold foreign keys and related foreign key
"""
# Column must contain string
converted_df = df.convert_dtypes()
string_columns = set(converted_df.dtypes[converted_df.dtypes == "string"].index)
fk_column_candidates = string_columns - set(core.SCALAR_COLUMNS)
logging.info(f"Possible FK candidates for {process=}: {fk_column_candidates}")

# Check if Fks are unique (cannot have different FKs per process/subprocess)
fk_candidates = {}
for fk_column in fk_column_candidates:
if len(df[fk_column].unique()) > 1:
continue # no candidate
fk = df[fk_column].iloc[0]
if "." not in fk:
continue # no candidate
fk_candidates[fk_column] = ForeignKey(*fk.split("."))
return fk_candidates


def get_process(collection_name: str, process: str) -> Process:
"""Loads data for given process from collection. (Deprecated! Use Adapter class instead).
Column headers are translated using ontology. Multiple dataframes per datatype are merged.
Expand All @@ -341,8 +376,6 @@ def get_process(collection_name: str, process: str, links: str) -> Process:
Name of collection to get data from
process : str
Name of process (from subject or metadata name, depends on USE_ANNOTATIONS)
links : str
Name of file to get linked parameters from
Returns
-------
Expand All @@ -355,5 +388,5 @@ def get_process(collection_name: str, process: str, links: str) -> Process:
)
logging.warning(deprecated_msg)
warnings.warn(deprecated_msg, DeprecationWarning)
adapter = Adapter(collection_name, structure_name=None, links_name=links)
adapter = Adapter(collection_name, structure_name=None)
return adapter.get_process(process)
25 changes: 0 additions & 25 deletions data_adapter/structure.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations

import csv
import re
from collections import defaultdict, namedtuple

import pandas as pd

Expand All @@ -18,29 +16,6 @@ class StructureError(Exception):
"""Raised if structure is corrupted."""


Link = namedtuple("Link", ("linked_process", "parameter"))


def get_links(links_name: str):
link_filename = settings.STRUCTURES_DIR / f"{links_name}.csv"
with open(link_filename, encoding="utf-8") as link_file:
link_csv = csv.DictReader(link_file, delimiter=";")
links = defaultdict(list)
for line in link_csv:
links[line["process"]].append(Link(line["table_name"], line["column_name"]))
return links


def get_links_for_process(process: str, links_name: str):
links = get_links(links_name)
if process not in links:
return {}
parameters = defaultdict(list)
for parameter in links[process]:
parameters[parameter.linked_process].append(parameter.parameter)
return parameters


def check_character_convention(dataframe: pd.DataFrame):
"""Check in parameter-, process-, input-and output-column for character convention.
Expand Down
Loading

0 comments on commit 34928d6

Please sign in to comment.