Skip to content

Commit

Permalink
Merge pull request #19 from sedos-project/release/v0.10.0
Browse files Browse the repository at this point in the history
Release v0.10.0
  • Loading branch information
henhuy authored Apr 26, 2023
2 parents beac3e3 + 988f905 commit 33fe518
Show file tree
Hide file tree
Showing 28 changed files with 2,064 additions and 78 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ Here is a template for new release sections
-
```

## [0.10.0] - 2023-04-26
### Added
- allow multiple (sub-)processes per table

### Changed
- collection metadata can contain multiple names and subjects per artifact

## [0.9.1] - 2023-04-21
### Fixed
- links in `get_process` function
Expand Down
2 changes: 1 addition & 1 deletion data_adapter/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.9.1"
version = "0.10.0"
97 changes: 81 additions & 16 deletions data_adapter/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from enum import IntEnum
from typing import List, Optional, Union

from data_adapter import core, settings
import pandas

from data_adapter import core, ontology, settings


class CollectionError(Exception):
Expand All @@ -22,13 +24,25 @@ class Artifact:
group: str
artifact: str
version: str
filename: str
subject: Optional[str]
datatype: DataType
filename: Optional[str] = None
datatype: DataType = DataType.Scalar
multiple_types: bool = False

def path(self):
@property
def path(self) -> pathlib.Path:
return settings.COLLECTIONS_DIR / self.collection / self.group / self.artifact / self.version

@property
def metadata(self) -> dict:
with open(self.path / self.get_filename(".json"), "r", encoding="utf-8") as metadata_file:
return json.load(metadata_file)

def get_filename(self, suffix: str) -> str:
for file in self.path.iterdir():
if file.suffix == suffix:
return file.name
raise FileNotFoundError(f"Artifact file with {suffix=} not found.")


def check_collection_meta(collection_meta: dict):
"""
Expand All @@ -49,30 +63,62 @@ def check_collection_meta(collection_meta: dict):
# Check if artifact info keys are missing:
for group, artifacts in collection_meta["artifacts"].items():
for artifact, artifact_infos in artifacts.items():
for key in ("latest_version", "subject", "datatype"):
for key in ("latest_version",):
if key not in artifact_infos:
raise CollectionError(
f"Collection metadata is invalid ({group=}, {artifact=} misses {key=}). "
"Collection metadata may changed. Please re-download collection and try again."
)


def get_metadata_from_artifact(artifact: Artifact) -> dict:
def infer_collection_metadata(collection: str, collection_meta: dict) -> dict:
"""
Returns metadata from given artifact.
Interferes downloaded collection and updates names and subjects of artifacts in collection metadata file
Parameters
----------
artifact: Artifact
Artifact to get metadata from
collection: str
Name of collection
collection_meta : dict
Metadata of collection to be updated
Returns
-------
dict
Metadata from artifact
Updated collection metadata
"""
with open(artifact.path() / f"{artifact.filename}.json", "r", encoding="utf-8") as metadata_file:
return json.load(metadata_file)
for group_name, artifacts in collection_meta["artifacts"].items():
for artifact_name in artifacts:
version = collection_meta["artifacts"][group_name][artifact_name]["latest_version"]

artifact = Artifact(collection, group_name, artifact_name, version)
metadata = artifact.metadata

# Check if artifact contains multiple (sub-)processes
try:
type_field = [
field for field in metadata["resources"][0]["schema"]["fields"] if field["name"] == "type"
][0]
except IndexError:
type_field = None

if type_field:
collection_meta["artifacts"][group_name][artifact_name]["multiple_types"] = True
collection_meta["artifacts"][group_name][artifact_name]["names"] = get_subprocesses_from_artifact(
artifact
)
collection_meta["artifacts"][group_name][artifact_name]["subjects"] = [
ontology.get_name_from_annotation(value_reference)
for value_reference in type_field["valueReference"]
]
else:
collection_meta["artifacts"][group_name][artifact_name]["multiple_types"] = False
collection_meta["artifacts"][group_name][artifact_name]["names"] = [metadata["name"]]
collection_meta["artifacts"][group_name][artifact_name]["subjects"] = [ontology.get_subject(metadata)]

collection_meta["artifacts"][group_name][artifact_name]["datatype"] = get_data_type(metadata)

return collection_meta


def get_data_type(metadata: Union[str, pathlib.Path, dict]):
Expand All @@ -83,6 +129,25 @@ def get_data_type(metadata: Union[str, pathlib.Path, dict]):
return DataType.Scalar


def get_subprocesses_from_artifact(artifact: Artifact) -> List[str]:
"""
Return list of subprocesses for given artifact
Returns entries of column "type" as list.
Parameters
----------
artifact: Artifact
Read type column of given artifact
Returns
-------
List[str]
List of subprocesses in given artifact
"""
return list(pandas.read_csv(artifact.path / artifact.get_filename(".csv"), usecols=("type",))["type"])


def get_collection_meta(collection: str) -> dict:
"""
Returns collection meta file if present
Expand Down Expand Up @@ -134,8 +199,8 @@ def get_artifacts_from_collection(collection: str, process: Optional[str] = None
artifacts = []
for group in collection_meta["artifacts"]:
for artifact, artifact_info in collection_meta["artifacts"][group].items():
process_name = artifact_info["subject"] if settings.USE_ANNOTATIONS else artifact_info["name"]
if process and process_name != process:
process_names = artifact_info["subjects"] if settings.USE_ANNOTATIONS else artifact_info["names"]
if process and process not in process_names:
continue
filename = artifact
artifacts.append(
Expand All @@ -145,8 +210,8 @@ def get_artifacts_from_collection(collection: str, process: Optional[str] = None
artifact,
artifact_info["latest_version"],
filename,
subject=process,
datatype=DataType(artifact_info["datatype"]),
multiple_types=artifact_info["multiple_types"],
)
)
return artifacts
13 changes: 3 additions & 10 deletions data_adapter/databus.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import requests
from SPARQLWrapper import JSON, SPARQLWrapper2

import data_adapter.collection
from data_adapter import core, ontology, settings
from data_adapter import collection, settings


def download_artifact(artifact_file: str, filename: Union[pathlib.Path, str]):
Expand Down Expand Up @@ -159,14 +158,15 @@ def download_collection(collection_url: str, force_download=False):
collection_dir.mkdir()

if collection_meta.get("version", None) != settings.COLLECTION_META_VERSION:
raise data_adapter.collection.CollectionError(
raise collection.CollectionError(
"Collection metadata has changed. Please remove collection and re-download. "
"Otherwise, strange behaviours could occur."
)

artifacts = get_artifacts_from_collection(collection_url)
artifact_versions = {artifact: get_latest_version_of_artifact(artifact) for artifact in artifacts}
__download_artifacts(artifact_versions, collection_dir, collection_meta, force_download)
collection_meta = collection.infer_collection_metadata(collection_name, collection_meta)

with open(collection_dir / settings.COLLECTION_JSON, "w", encoding="utf-8") as collection_json_file:
json.dump(collection_meta, collection_json_file)
Expand Down Expand Up @@ -224,11 +224,4 @@ def __download_artifacts(
suffix = artifact_filename.split(".")[-1]
filename = f"{artifact_name}.{suffix}"
download_artifact(artifact_filename, version_dir / filename)
if suffix == "json":
metadata = core.get_metadata(version_dir / filename)
collection_meta["artifacts"][group_name][artifact_name]["name"] = metadata["name"]
collection_meta["artifacts"][group_name][artifact_name]["subject"] = ontology.get_subject(metadata)
collection_meta["artifacts"][group_name][artifact_name][
"datatype"
] = data_adapter.collection.get_data_type(metadata)
logging.info(f"Downloaded {artifact_name=} {version=}.")
36 changes: 18 additions & 18 deletions data_adapter/ontology.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import pathlib
from collections import defaultdict
from dataclasses import dataclass
Expand Down Expand Up @@ -28,7 +27,10 @@ def get_subject(metadata: Union[str, pathlib.Path, dict]) -> str:
metadata_dict = core.get_metadata(metadata)
if "subject" not in metadata_dict:
return metadata_dict["name"]
return get_name_from_annotation(metadata_dict["subject"], default=metadata_dict["name"])
try:
return get_name_from_annotation(metadata_dict["subject"])
except AnnotationError:
return metadata_dict["name"]


def get_name_from_ontology(oeo_path: str) -> str:
Expand Down Expand Up @@ -57,23 +59,22 @@ def get_name_from_ontology(oeo_path: str) -> str:
raise AnnotationError(f"No ontology concept found for {oeo_path=}.")


def get_name_from_annotation(annotation, default) -> str:
names = []
if not annotation:
return default
for entry in annotation:
if "path" in entry:
def get_name_from_annotation(annotation) -> str:
def name_from_item(annotation_item):
if "path" in annotation_item:
try:
names.append(get_name_from_ontology(entry["path"]))
continue
return get_name_from_ontology(annotation_item["path"])
except AnnotationError:
pass
if "name" in entry and entry["name"]:
names.append(entry["name"])
continue
logging.warning(f"Could not read annotation ({annotation=}) for '{default}'.")
return default
return "_".join(names)
if "name" in annotation_item and annotation_item["name"]:
return annotation_item["name"]
raise AnnotationError(f"Could not get name from annotation ({annotation_item})")

if isinstance(annotation, dict):
return name_from_item(annotation)
if isinstance(annotation, list):
return "_".join(name_from_item(item) for item in annotation)
raise AnnotationError(f"Invalid {annotation=}. Must be either annotation dict or list of annotation dicts.")


def __check_quality_of_annotation(annotation: list[dict[str, str]]) -> AnnotationQuality:
Expand Down Expand Up @@ -153,6 +154,5 @@ def check_annotations_in_collection(
annotation_qualities: dict = defaultdict(list)
artifacts = collection.get_artifacts_from_collection(collection_name)
for artifact in artifacts:
metadata = collection.get_metadata_from_artifact(artifact)
annotation_qualities[artifact].extend(*check_annotations_in_metadata(metadata))
annotation_qualities[artifact].extend(*check_annotations_in_metadata(artifact.metadata))
return annotation_qualities
20 changes: 15 additions & 5 deletions data_adapter/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_process(self, process: str) -> Process:
scalar_dfs = []
timeseries_df = []
for artifact in artifacts:
df = self.__get_df_from_artifact(artifact)
df = self.__get_df_from_artifact(artifact, process)
if artifact.datatype == collection.DataType.Scalar:
scalar_dfs.append(df)
else:
Expand All @@ -98,7 +98,7 @@ def get_process(self, process: str) -> Process:
raise structure.StructureError(
f"Linked parameter for process '{process}' points to subject '{subject}' which is not unique."
)
df = self.__get_df_from_artifact(artifacts[0], *parameters)
df = self.__get_df_from_artifact(artifacts[0], subject, *parameters)
if artifacts[0].datatype == collection.DataType.Scalar:
scalar_dfs.append(df)
else:
Expand Down Expand Up @@ -153,7 +153,7 @@ def get_process_list(self) -> List[str]:
"You have to init adapter class with structure name in order to use structure functions."
)

def __get_df_from_artifact(self, artifact: collection.Artifact, *parameters: str):
def __get_df_from_artifact(self, artifact: collection.Artifact, process: str, *parameters: str):
"""
Returns DataFrame from given artifact.
Expand All @@ -164,29 +164,39 @@ def __get_df_from_artifact(self, artifact: collection.Artifact, *parameters: str
----------
artifact: Artifact
Artifact to get DataFrame from
process: str
Process to filter (needed in case of multiple subprocesses)
parameters: tuple[str]
Parameters to filter DataFrame
Returns
-------
pandas.DataFrame
"""
metadata = collection.get_metadata_from_artifact(artifact)
metadata = artifact.metadata
fl_table_schema = core.reformat_oep_to_frictionless_schema(metadata["resources"][0]["schema"])
resource = frictionless.Resource(
name=metadata["name"],
profile="tabular-data-resource",
source=artifact.path() / f"{artifact.filename}.csv",
source=artifact.path / f"{artifact.filename}.csv",
schema=fl_table_schema,
format="csv",
)
df = resource.to_pandas()

if artifact.multiple_types:
df = self.__filter_subprocess(df, process)
if len(parameters) > 0:
df = self.__filter_parameters(df, parameters, artifact.datatype)

# Unpack regions:
return df.explode("region")

@staticmethod
def __filter_subprocess(df: pandas.DataFrame, subprocess: str) -> pandas.DataFrame:
df = df[df["type"] == subprocess]
return df.drop("type", axis=1)

@staticmethod
def __filter_parameters(
df: pandas.DataFrame, parameters: Iterable[str], datatype: collection.DataType
Expand Down
2 changes: 1 addition & 1 deletion data_adapter/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"change path to collection folder by changing environment variable 'COLLECTIONS_DIR'."
)
COLLECTION_JSON = "collection.json"
COLLECTION_META_VERSION = "v2"
COLLECTION_META_VERSION = "v3"

STRUCTURES_DIR = (
pathlib.Path(os.environ["STRUCTURES_DIR"]) if "STRUCTURES_DIR" in os.environ else pathlib.Path.cwd() / "structures"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "data-adapter"
version = "0.9.1"
version = "0.10.0"
description = "Provides general functionality for other data adapters"
authors = ["Hendrik Huyskens <hendrik.huyskens@rl-institut.de>"]
license = "AGPL-3.0 license"
Expand Down
13 changes: 13 additions & 0 deletions tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ def test_filter_process_from_collection_with_annotations():
with utils.turn_on_annotations():
artifacts = collection.get_artifacts_from_collection("simple", "net capacity factor")
assert len(artifacts) == 1


def test_infer_collection_metadata():
collection_name = "subprocesses"
metadata = collection.get_collection_meta(collection_name)
collection.infer_collection_metadata(collection_name, metadata)
wind_turbine = metadata["artifacts"]["modex"]["modex_tech_wind_turbine"]
assert len(wind_turbine["names"]) == 2
assert "wind_onshore" in wind_turbine["names"]
assert "wind_offshore" in wind_turbine["names"]
assert len(wind_turbine["subjects"]) == 2
assert "Wind Onshore" in wind_turbine["subjects"]
assert "Wind Offshore" in wind_turbine["subjects"]
Loading

0 comments on commit 33fe518

Please sign in to comment.