Skip to content

Commit

Permalink
Merge branch 'main' into issue/80-uo-nan-values
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul-B98 committed Nov 16, 2023
2 parents d46cb14 + 7b87de8 commit c2384af
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 41 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/lint_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest_cov coveralls
pip install flake8 pytest_cov coveralls mypy
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
Expand All @@ -32,4 +32,7 @@ jobs:
- name: Coveralls
run: coveralls
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
- name: Run mypy
run: |
mypy ./pyAKI/
2 changes: 1 addition & 1 deletion pyAKI/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pyAKI.kdigo import Analyser

from pbr.version import VersionInfo
from pbr.version import VersionInfo # type: ignore

# Check the PBR version module docs for other options than release_string()
__version__ = VersionInfo("pyAKI").release_string()
19 changes: 12 additions & 7 deletions pyAKI/kdigo.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,23 @@ def process_stay(self, stay_id: str) -> pd.DataFrame:
pd.DataFrame: The analysis results for the specific stay.
"""

data = [(name, data.loc[stay_id]) for name, data in self._data]
datasets: list[Dataset] = [
Dataset(dtype, data.loc[stay_id]) for dtype, data in self._data # type: ignore
]

for probe in self._probes:
data: Dataset = probe.probe(data)
datasets = probe.probe(datasets)

(_, df), *datasets = data
(_, df), *datasets = datasets
for _, _df in datasets:
if isinstance(_df, pd.Series):
_df = pd.DataFrame([_df], index=df.index)
df: pd.DataFrame = df.merge(
_df, how="outer", left_index=True, right_index=True
)
df = df.merge(_df, how="outer", left_index=True, right_index=True)

df["stage"] = df.filter(like="stage").max(axis=1)
return df.set_index([pd.Index([stay_id] * len(df), name="stay_id"), df.index])
return df.set_index(
pd.MultiIndex.from_arrays(
[[stay_id] * len(df), df.index.values],
names=(self._stay_identifier, df.index.name),
)
)
20 changes: 10 additions & 10 deletions pyAKI/preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(
stay_identifier: str = "stay_id",
time_identifier: str = "charttime",
interpolate: bool = True,
threshold: Optional[int] = 1,
threshold: int = 1,
) -> None:
"""
Initialize a new instance of the UrineOutputPreProcessor class.
Expand All @@ -82,11 +82,11 @@ def __init__(
super().__init__(stay_identifier, time_identifier)

self._interpolate: bool = interpolate
self._threshold: Optional[int] = threshold
self._threshold: int = threshold

@dataset_as_df(df=DatasetType.URINEOUTPUT)
@df_to_dataset(DatasetType.URINEOUTPUT)
def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
def process(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.
Expand All @@ -97,7 +97,7 @@ def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
pd.DataFrame: The processed urine output dataset as a pandas DataFrame.
"""

df = df.groupby(self._stay_identifier).resample("1H").sum()
df = df.groupby(self._stay_identifier).resample("1H").sum() # type: ignore
df[df["urineoutput"] == 0] = None

if not self._interpolate:
Expand All @@ -122,7 +122,7 @@ def __init__(
stay_identifier: str = "stay_id",
time_identifier: str = "charttime",
ffill: bool = True,
threshold: Optional[int] = 72,
threshold: int = 72,
) -> None:
"""
Initialize a new instance of the CreatininePreProcessor class.
Expand All @@ -140,7 +140,7 @@ def __init__(

@dataset_as_df(df=DatasetType.CREATININE)
@df_to_dataset(DatasetType.CREATININE)
def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
def process(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Process the creatinine dataset by resampling and performing forward filling on missing values.
Expand All @@ -150,7 +150,7 @@ def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
Returns:
pd.DataFrame: The processed creatinine dataset as a pandas DataFrame.
"""
df = df.groupby(self._stay_identifier).resample("1H").mean()
df = df.groupby(self._stay_identifier).resample("1H").mean() # type: ignore
if not self._ffill:
return df

Expand All @@ -163,7 +163,7 @@ class DemographicsPreProcessor(Preprocessor):

@dataset_as_df(df=DatasetType.DEMOGRAPHICS)
@df_to_dataset(DatasetType.DEMOGRAPHICS)
def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
def process(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Process the demographics dataset by aggregating the data based on stay identifiers.
Expand All @@ -181,7 +181,7 @@ class RRTPreProcessor(Preprocessor):

@dataset_as_df(df=DatasetType.RRT)
@df_to_dataset(DatasetType.RRT)
def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
def process(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.
Expand All @@ -191,5 +191,5 @@ def process(self, df: pd.DataFrame = None) -> pd.DataFrame:
Returns:
pd.DataFrame: The processed RRT dataset as a pandas DataFrame.
"""
df = df.groupby(self._stay_identifier).resample("1H").last()
df = df.groupby(self._stay_identifier).resample("1H").last() # type: ignore
return df.ffill()
26 changes: 13 additions & 13 deletions pyAKI/probes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@

from abc import ABC, ABCMeta
from enum import StrEnum, auto
from typing import Optional, Dict

import pandas as pd
import numpy as np


import logging
from pyAKI.utils import dataset_as_df, df_to_dataset, approx_gte, Dataset, DatasetType


Expand Down Expand Up @@ -43,7 +42,7 @@ def probe(self, datasets: list[Dataset], **kwargs) -> pd.DataFrame:

RESNAME: str = "" # name of the column that will be added to the dataframe

def probe(self, datasets: list[Dataset], **kwargs) -> pd.DataFrame:
def probe(self, datasets: list[Dataset], **kwargs) -> list[Dataset]:
"""
Abstract method to be implemented by subclasses.
Expand Down Expand Up @@ -123,8 +122,8 @@ def __init__(
@df_to_dataset(DatasetType.URINEOUTPUT)
def probe(
self,
df: pd.DataFrame = None,
patient: pd.DataFrame = None,
df: pd.DataFrame,
patient: pd.DataFrame,
**kwargs,
) -> pd.DataFrame:
"""
Expand All @@ -140,6 +139,9 @@ def probe(
Returns:
pd.DataFrame: The modified DataFrame with the urine output stage column added.
"""
if "weight" not in patient:
raise ValueError("Missing weight for stay")

weight: pd.Series = patient["weight"]
# fmt: off
df.loc[:, self.RESNAME] = np.nan # set all urineoutput_stage values to NaN
Expand Down Expand Up @@ -232,8 +234,8 @@ def creatinine_baseline(self, df: pd.DataFrame) -> pd.Series:
Returns:
pd.Series: The calculated creatinine baseline values.
"""

if self._method == CreatinineBaselineMethod.FIRST:
return (
df[df[self._column] > 0]
Expand All @@ -255,15 +257,15 @@ def creatinine_baseline(self, df: pd.DataFrame) -> pd.Series:
)

if self._method == CreatinineBaselineMethod.FIXED:
values = (
values: pd.Series = (
df[df[self._column] > 0]
.rolling(self._baseline_timeframe)
.min()
.resample("1h")
.min()
.ffill()[self._column]
)
min_value = values[
min_value: pd.DatetimeIndex = values[
values.index
<= (values.index[0] + pd.Timedelta(self._baseline_timeframe))
].min() # calculate min value for first 7 days
Expand Down Expand Up @@ -294,7 +296,7 @@ class AbsoluteCreatinineProbe(AbstractCreatinineProbe):

@dataset_as_df(df=DatasetType.CREATININE)
@df_to_dataset(DatasetType.CREATININE)
def probe(self, df: pd.DataFrame = None, **kwargs) -> pd.DataFrame:
def probe(self, df: pd.DataFrame, **kwargs) -> pd.DataFrame:
"""
Perform KDIGO stage calculation based on absolute creatinine elevations on the provided DataFrame.
Expand Down Expand Up @@ -337,7 +339,7 @@ class RelativeCreatinineProbe(AbstractCreatinineProbe):

@dataset_as_df(df=DatasetType.CREATININE)
@df_to_dataset(DatasetType.CREATININE)
def probe(self, df: pd.DataFrame = None, **kwargs) -> pd.DataFrame:
def probe(self, df: pd.DataFrame, **kwargs) -> pd.DataFrame:
"""
Perform calculation of relative creatinine elevations on the provided DataFrame.
Expand Down Expand Up @@ -393,9 +395,7 @@ def __init__(self, column: str = "rrt_status") -> None:

@dataset_as_df(df=DatasetType.RRT)
@df_to_dataset(DatasetType.RRT)
def probe(
self, df: pd.DataFrame = None, **kwargs: Optional[Dict[str, str]]
) -> pd.DataFrame:
def probe(self, df: pd.DataFrame) -> pd.DataFrame:
"""Perform calculation of RRT on the provided DataFrame."""
df.loc[:, self.RESNAME] = 0
df.loc[df[self._column] == 1, self.RESNAME] = 3
Expand Down
18 changes: 11 additions & 7 deletions pyAKI/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import NamedTuple
from typing import NamedTuple, cast
from enum import StrEnum, auto
from functools import wraps

Expand Down Expand Up @@ -37,7 +37,7 @@ class Dataset(NamedTuple):
df: pd.DataFrame


def dataset_as_df(**mapping: dict[str, DatasetType]):
def dataset_as_df(**mapping: DatasetType):
"""
Decorator factory for methods that process datasets with dataframes.
Expand Down Expand Up @@ -72,11 +72,15 @@ def process_data(self, data: pd.DataFrame, labels: pd.DataFrame):
processed_datasets = my_instance.process_data(datasets)
"""
# swap keys and values in the mapping
in_mapping: dict[DatasetType, str] = {v: k for k, v in mapping.items()}
in_mapping: dict[DatasetType, str] = {}
for k, v in mapping.items():
in_mapping[cast(DatasetType, v)] = k

# in_mapping: Dict[DatasetType, str] = {v: k for k, v in mapping.items()}

def decorator(func):
@wraps(func)
def wrapper(self, datasets: list[Dataset], *args: list, **kwargs: dict):
def wrapper(self, datasets: list[Dataset], *args, **kwargs) -> list[Dataset]:
# map the dataset types to corresponding DataFrames
_mapping: dict[str, pd.DataFrame] = {
in_mapping[dtype]: df
Expand Down Expand Up @@ -125,13 +129,13 @@ def process_dataframe(self, *args: list, **kwargs: dict) -> pd.DataFrame:

def decorator(func):
@wraps(func)
def wrapper(self, *args: list, **kwargs: dict):
def wrapper(self, *args: list, **kwargs: dict) -> Dataset:
return Dataset(dtype, func(self, *args, **kwargs))

return wrapper

return decorator


def approx_gte(x: pd.Series, y: pd.Series) -> bool:
return (x >= y).values | np.isclose(x, y)
def approx_gte(x: pd.Series, y: pd.Series | float) -> bool | np.ndarray:
return np.logical_or(np.asarray(x >= y), np.isclose(x, y))
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pandas==2.1.3
pandas-stubs==2.1.1.230928
numpy==1.26.2
scipy==1.11.3
pbr==6.0.0
typer[all]==0.9.0
typer[all]==0.9.0

0 comments on commit c2384af

Please sign in to comment.