diff --git a/odtlearn/constrained_oct.py b/odtlearn/constrained_oct.py index e0d7c84..30e10d3 100644 --- a/odtlearn/constrained_oct.py +++ b/odtlearn/constrained_oct.py @@ -25,7 +25,13 @@ class ConstrainedOCT(FlowOCTMultipleSink): """ def __init__( - self, solver, _lambda, depth, time_limit, num_threads, verbose + self, + solver: str, + _lambda: float, + depth: int, + time_limit: int, + num_threads: None, + verbose: bool, ) -> None: super().__init__(solver, _lambda, depth, time_limit, num_threads, verbose) @@ -34,6 +40,6 @@ def __init__( def _define_side_constraints(self): pass - def _define_constraints(self): + def _define_constraints(self) -> None: super()._define_constraints() self._define_side_constraints() diff --git a/odtlearn/fair_oct.py b/odtlearn/fair_oct.py index 35f658c..8be5cf2 100644 --- a/odtlearn/fair_oct.py +++ b/odtlearn/fair_oct.py @@ -1,8 +1,12 @@ import warnings from itertools import combinations +from typing import Union import numpy as np import pandas as pd +from numpy import ndarray +from pandas.core.frame import DataFrame +from pandas.core.series import Series from sklearn.utils.multiclass import unique_labels from sklearn.utils.validation import check_array, check_is_fitted, check_X_y @@ -94,15 +98,15 @@ class FairConstrainedOCT(ConstrainedOCT): def __init__( self, - solver, - positive_class, - _lambda, - obj_mode, - fairness_bound, - depth, - time_limit, - num_threads, - verbose, + solver: str, + positive_class: int, + _lambda: float, + obj_mode: str, + fairness_bound: float, + depth: int, + time_limit: int, + num_threads: None, + verbose: bool, ) -> None: self._positive_class = positive_class self._fairness_bound = fairness_bound @@ -113,7 +117,13 @@ def __init__( self.weights = None super().__init__(solver, _lambda, depth, time_limit, num_threads, verbose) - def _extract_metadata(self, X, y, protect_feat): + def _extract_metadata( + self, + X: Union[ndarray, DataFrame], + y: Union[ndarray, Series], + protect_feat: Union[ndarray, DataFrame], + ) -> None: + super(ConstrainedOCT, self)._extract_metadata(X, y) if isinstance(protect_feat, pd.DataFrame): self._protect_feat_col_labels = protect_feat.columns @@ -123,7 +133,7 @@ def _extract_metadata(self, X, y, protect_feat): [f"P_{i}" for i in np.arange(0, protect_feat.shape[1])] ) - def _add_fairness_constraint(self, p_df, p_prime_df): + def _add_fairness_constraint(self, p_df: DataFrame, p_prime_df: DataFrame) -> bool: """ Add the fairness constraint to the MIP problem. @@ -194,7 +204,7 @@ def _add_fairness_constraint(self, p_df, p_prime_df): return constraint_added - def _define_objective(self): + def _define_objective(self) -> None: # Max sum(sum(zeta[i,n,y(i)])) obj = self._solver.lin_expr(0) for n in self._tree.Nodes: @@ -211,7 +221,14 @@ def _define_objective(self): self._solver.set_objective(obj, ODTL.MAXIMIZE) - def fit(self, X, y, protect_feat, legit_factor, weights=None): + def fit( + self, + X: ndarray, + y: ndarray, + protect_feat: ndarray, + legit_factor: ndarray, + weights: None = None, + ) -> Union["FairCSPOCT", "FairSPOCT", "FairEOddsOCT", "FairEOppOCT", "FairPEOCT"]: """ Fit the Fair Constrained Optimal Classification Tree (FairConstrainedOCT) model to the given training data. @@ -341,7 +358,7 @@ def fit(self, X, y, protect_feat, legit_factor, weights=None): # Return the classifier return self - def predict(self, X): + def predict(self, X: Union[DataFrame, ndarray]) -> ndarray: """ Predict class labels for samples in X using the fitted Fair Constrained Optimal Classification Tree model. @@ -433,15 +450,15 @@ class FairSPOCT(FairConstrainedOCT): def __init__( self, - solver, - positive_class, - depth=1, - time_limit=60, - _lambda=0, - obj_mode="acc", - fairness_bound=1, - num_threads=None, - verbose=False, + solver: str, + positive_class: int, + depth: int = 1, + time_limit: int = 60, + _lambda: float = 0, + obj_mode: str = "acc", + fairness_bound: float = 1, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( @@ -456,7 +473,7 @@ def __init__( verbose, ) - def _define_side_constraints(self): + def _define_side_constraints(self) -> None: # Loop through all possible combinations of the protected feature for protected_feature in self._P_col_labels: for combo in combinations(self._X_p[protected_feature].unique(), 2): @@ -467,7 +484,9 @@ def _define_side_constraints(self): p_prime_df = self._X_p[self._X_p[protected_feature] == p_prime] self._add_fairness_constraint(p_df, p_prime_df) - def calc_metric(self, protect_feat, y): + def calc_metric( + self, protect_feat: Union[DataFrame, ndarray], y: Union[Series, ndarray] + ): """ Calculate the statistical parity metric for the given data. @@ -553,15 +572,15 @@ class FairCSPOCT(FairConstrainedOCT): def __init__( self, - solver, - positive_class, - depth=1, - time_limit=60, - _lambda=0, - obj_mode="acc", - fairness_bound=1, - num_threads=None, - verbose=False, + solver: str, + positive_class: int, + depth: int = 1, + time_limit: int = 60, + _lambda: float = 0, + obj_mode: str = "acc", + fairness_bound: float = 1, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( @@ -576,7 +595,7 @@ def __init__( verbose, ) - def _define_side_constraints(self): + def _define_side_constraints(self) -> None: # Loop through all possible combinations of the protected feature for protected_feature in self._P_col_labels: for combo in combinations(self._X_p[protected_feature].unique(), 2): @@ -593,7 +612,12 @@ def _define_side_constraints(self): ] self._add_fairness_constraint(p_df, p_prime_df) - def calc_metric(self, protect_feat, legit_factor, y): + def calc_metric( + self, + protect_feat: Union[DataFrame, ndarray], + legit_factor: Union[DataFrame, ndarray], + y: Union[Series, ndarray], + ): """ Calculate the conditional statistical parity metric for the given data. @@ -697,15 +721,15 @@ class FairPEOCT(FairConstrainedOCT): def __init__( self, - solver, - positive_class, - depth=1, - time_limit=60, - _lambda=0, - obj_mode="acc", - fairness_bound=1, - num_threads=None, - verbose=False, + solver: str, + positive_class: int, + depth: int = 1, + time_limit: int = 60, + _lambda: float = 0, + obj_mode: str = "acc", + fairness_bound: float = 1, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( @@ -720,7 +744,7 @@ def __init__( verbose, ) - def _define_side_constraints(self): + def _define_side_constraints(self) -> None: # Loop through all possible combinations of the protected feature for protected_feature in self._P_col_labels: for combo in combinations(self._X_p[protected_feature].unique(), 2): @@ -736,7 +760,12 @@ def _define_side_constraints(self): ] self._add_fairness_constraint(p_df, p_prime_df) - def calc_metric(self, protect_feat, y, y_pred): + def calc_metric( + self, + protect_feat: Union[DataFrame, ndarray], + y: Union[Series, ndarray], + y_pred: Union[Series, ndarray], + ): """ Calculate the predictive equality metric for the given data. @@ -841,15 +870,15 @@ class FairEOppOCT(FairConstrainedOCT): def __init__( self, - solver, - positive_class, - depth=1, - time_limit=60, - _lambda=0, - obj_mode="acc", - fairness_bound=1, - num_threads=None, - verbose=False, + solver: str, + positive_class: int, + depth: int = 1, + time_limit: int = 60, + _lambda: float = 0, + obj_mode: str = "acc", + fairness_bound: float = 1, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( @@ -864,7 +893,7 @@ def __init__( verbose, ) - def _define_side_constraints(self): + def _define_side_constraints(self) -> None: # Loop through all possible combinations of the protected feature for protected_feature in self._P_col_labels: for combo in combinations(self._X_p[protected_feature].unique(), 2): @@ -918,15 +947,15 @@ class FairEOddsOCT(FairConstrainedOCT): def __init__( self, - solver, - positive_class, - depth=1, - time_limit=60, - _lambda=0, - obj_mode="acc", - fairness_bound=1, - num_threads=None, - verbose=False, + solver: str, + positive_class: int, + depth: int = 1, + time_limit: int = 60, + _lambda: float = 0, + obj_mode: str = "acc", + fairness_bound: float = 1, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( @@ -941,7 +970,7 @@ def __init__( verbose, ) - def _define_side_constraints(self): + def _define_side_constraints(self) -> None: # Loop through all possible combinations of the protected feature for protected_feature in self._P_col_labels: for combo in combinations(self._X_p[protected_feature].unique(), 2): diff --git a/odtlearn/flow_oct.py b/odtlearn/flow_oct.py index 7b15778..1a5aa9e 100644 --- a/odtlearn/flow_oct.py +++ b/odtlearn/flow_oct.py @@ -1,6 +1,10 @@ import warnings +from typing import Any, Union import numpy as np +from numpy import ndarray +from pandas.core.frame import DataFrame +from pandas.core.series import Series from sklearn.utils.multiclass import unique_labels from sklearn.utils.validation import check_array, check_is_fitted, check_X_y @@ -55,9 +59,10 @@ class FlowOCT(FlowOCTSingleSink): :mod:`FlowOCTSingleSink ` class and adds the specific objective function and model fitting process. - The class supports two objective modes: "acc" (accuracy) and "balance". The accuracy objective - aims to maximize the prediction accuracy of the learned tree, while the balance objective aims - to learn a balanced optimal decision tree to better generalize to out-of-sample data. + The class supports three objective modes: "acc" (accuracy), "balance", and "custom". The accuracy objective + aims to maximize the prediction accuracy of the learned tree, the balance objective aims + to learn a balanced optimal decision tree to better generalize to out-of-sample data, and the + custom objective allows users to pass their own weights. The The :meth:`fit ` method method is used to fit the optimal classification tree to the given training data. It @@ -83,13 +88,13 @@ class and adds the specific objective function and model fitting process. def __init__( self, - solver, - _lambda=0, - obj_mode="acc", - depth=1, - time_limit=60, - num_threads=None, - verbose=False, + solver: str, + _lambda: float = 0.0, + obj_mode: str = "acc", + depth: int = 1, + time_limit: int = 60, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( solver, @@ -102,9 +107,8 @@ def __init__( if obj_mode not in ["acc", "balance", "custom"]: raise ValueError("objective must be one of 'acc', 'balance', or 'custom'") self._obj_mode = obj_mode - self.weights = None - def _define_objective(self): + def _define_objective(self) -> None: obj = self._solver.lin_expr(0) for n in self._tree.Nodes: for f in self._X_col_labels: @@ -115,7 +119,12 @@ def _define_objective(self): self._solver.set_objective(obj, ODTL.MAXIMIZE) - def fit(self, X, y, weights=None): + def fit( + self, + X: Union[ndarray, DataFrame], + y: Union[ndarray, Series], + weights: Union[Series, ndarray, None] = None, + ) -> "FlowOCT": """ Fit the FlowOCT model to the given training data. @@ -175,6 +184,8 @@ def fit(self, X, y, weights=None): ) else: self.weights = np.array(weights) + else: + self.weights = None # Generate weights based on obj_mode if self._obj_mode == "acc": @@ -198,7 +209,7 @@ def fit(self, X, y, weights=None): return self - def predict(self, X): + def predict(self, X: Union[DataFrame, ndarray]) -> ndarray: """ Predict class labels for samples in X using the fitted FlowOCT model. @@ -345,13 +356,13 @@ class BendersOCT(FlowOCTSingleSink): def __init__( self, - solver, - _lambda=0, - obj_mode="acc", - depth=1, - time_limit=60, - num_threads=None, - verbose=False, + solver: str, + _lambda: float = 0.0, + obj_mode: str = "acc", + depth: int = 1, + time_limit: int = 60, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( @@ -377,10 +388,10 @@ def _define_variables(self): self._datapoints, vtype=ODTL.CONTINUOUS, ub=1, name="g" ) - def _define_constraints(self): + def _define_constraints(self) -> None: self._tree_structure_constraints() - def _define_objective(self): + def _define_objective(self) -> None: obj = self._solver.lin_expr(0) for n in self._tree.Nodes: for f in self._X_col_labels: @@ -391,7 +402,12 @@ def _define_objective(self): self._solver.set_objective(obj, ODTL.MAXIMIZE) - def fit(self, X, y, weights=None): + def fit( + self, + X: Union[ndarray, DataFrame], + y: Union[ndarray, Series], + weights: Union[Series, ndarray, None] = None, + ) -> "BendersOCT": """ Fit the BendersOCT model to the given training data. @@ -504,7 +520,7 @@ def fit(self, X, y, weights=None): return self - def predict(self, X): + def predict(self, X: Union[DataFrame, ndarray]) -> ndarray[Any, Any]: """ Predict class labels for samples in X using the fitted BendersOCT model. diff --git a/odtlearn/flow_oct_ms.py b/odtlearn/flow_oct_ms.py index 1dc48de..0330552 100644 --- a/odtlearn/flow_oct_ms.py +++ b/odtlearn/flow_oct_ms.py @@ -80,12 +80,12 @@ class FlowOCTMultipleSink(OptimalClassificationTree): def __init__( self, - solver, - _lambda, - depth, - time_limit, - num_threads, - verbose, + solver: str, + _lambda: float, + depth: int, + time_limit: int, + num_threads: None, + verbose: bool, ) -> None: self._lambda = _lambda @@ -97,7 +97,7 @@ def __init__( verbose, ) - def _tree_struc_variables(self): + def _tree_struc_variables(self) -> None: # b[n,f] ==1 iff at node n we branch on feature f self._b = self._solver.add_vars( self._tree.Nodes, self._X_col_labels, vtype=ODTL.BINARY, name="b" @@ -117,7 +117,7 @@ def _tree_struc_variables(self): name="w", ) - def _flow_variables(self): + def _flow_variables(self) -> None: # zeta[i,n,k] is the amount of flow through the edge connecting node n to sink node t,k for datapoint i self._zeta = self._solver.add_vars( self._datapoints, @@ -136,11 +136,11 @@ def _flow_variables(self): name="z", ) - def _define_variables(self): + def _define_variables(self) -> None: self._tree_struc_variables() self._flow_variables() - def _tree_structure_constraints(self): + def _tree_structure_constraints(self) -> None: # sum(b[n,f], f) + p[n] + sum(p[m], m in A(n)) = 1 forall n in Nodes self._solver.add_constrs( ( @@ -176,7 +176,7 @@ def _tree_structure_constraints(self): for n in self._tree.Nodes + self._tree.Leaves ) - def _flow_constraints(self): + def _flow_constraints(self) -> None: # Flow Constraints # z[i,n] = z[i,l(n)] + z[i,r(n)] + (zeta[i,n,k] for all k in Labels) forall i, n in Nodes for n in self._tree.Nodes: @@ -200,7 +200,7 @@ def _flow_constraints(self): for i in self._datapoints ) - def _arc_constraints(self): + def _arc_constraints(self) -> None: # Arc constraints # z[i,l(n)] <= sum(b[n,f], f if x[i,f]=0) forall i, n in Nodes for i in self._datapoints: @@ -236,7 +236,7 @@ def _arc_constraints(self): # z[i,1] = 1 for all i datapoints self._solver.add_constrs(self._z[i, 1] == 1 for i in self._datapoints) - def _define_constraints(self): + def _define_constraints(self) -> None: self._tree_structure_constraints() self._flow_constraints() self._arc_constraints() diff --git a/odtlearn/flow_oct_ss.py b/odtlearn/flow_oct_ss.py index 3fa9a8d..8a468c0 100644 --- a/odtlearn/flow_oct_ss.py +++ b/odtlearn/flow_oct_ss.py @@ -1,4 +1,4 @@ -# from gurobipy import GRB, quicksum +from typing import Union from odtlearn import ODTL from odtlearn.opt_ct import OptimalClassificationTree @@ -82,12 +82,12 @@ class FlowOCTSingleSink(OptimalClassificationTree): def __init__( self, - solver, - _lambda, - depth, - time_limit, - num_threads, - verbose, + solver: str, + _lambda: Union[int, float], + depth: int, + time_limit: int, + num_threads: Union[None, int], + verbose: bool, ) -> None: self._lambda = _lambda @@ -100,7 +100,7 @@ def __init__( verbose, ) - def _tree_struc_variables(self): + def _tree_struc_variables(self) -> None: # b[n,f] ==1 iff at node n we branch on feature f self._b = self._solver.add_vars( self._tree.Nodes, self._X_col_labels, vtype=ODTL.BINARY, name="b" @@ -120,7 +120,7 @@ def _tree_struc_variables(self): name="w", ) - def _flow_variables(self): + def _flow_variables(self) -> None: # zeta[i,n] is the amount of flow through the edge connecting node n # to sink node t for data-point i self._zeta = self._solver.add_vars( @@ -140,11 +140,11 @@ def _flow_variables(self): name="z", ) - def _define_variables(self): + def _define_variables(self) -> None: self._tree_struc_variables() self._flow_variables() - def _tree_structure_constraints(self): + def _tree_structure_constraints(self) -> None: # sum(b[n,f], f) + p[n] + sum(p[m], m in A(n)) = 1 forall n in Nodes self._solver.add_constrs( ( @@ -172,7 +172,7 @@ def _tree_structure_constraints(self): for n in self._tree.Leaves ) - def _flow_constraints(self): + def _flow_constraints(self) -> None: # z[i,n] = z[i,l(n)] + z[i,r(n)] + zeta[i,n] forall i, n in Nodes for n in self._tree.Nodes: n_left = int(self._tree.get_left_children(n)) @@ -191,7 +191,7 @@ def _flow_constraints(self): self._zeta[i, n] == self._z[i, n] for i in self._datapoints ) - def _arc_constraints(self): + def _arc_constraints(self) -> None: # z[i,l(n)] <= sum(b[n,f], f if x[i,f]=0) forall i, n in Nodes # changed this to loop over the indicies of X and check if the column values at a given idx # equals zero @@ -228,7 +228,7 @@ def _arc_constraints(self): self._zeta[i, n] <= self._w[n, self._y[i]] for i in self._datapoints ) - def _define_constraints(self): + def _define_constraints(self) -> None: self._tree_structure_constraints() self._flow_constraints() self._arc_constraints() diff --git a/odtlearn/flow_opt.py b/odtlearn/flow_opt.py index c6f8898..a63bf83 100644 --- a/odtlearn/flow_opt.py +++ b/odtlearn/flow_opt.py @@ -1,3 +1,8 @@ +from typing import Union + +from numpy import ndarray +from pandas.core.frame import DataFrame +from pandas.core.series import Series from sklearn.utils.validation import check_array, check_is_fitted, check_X_y from odtlearn import ODTL @@ -88,11 +93,11 @@ class to make predictions and visualize the learned tree. def __init__( self, - solver, - depth=1, - time_limit=60, - num_threads=None, - verbose=False, + solver: str, + depth: int = 1, + time_limit: int = 60, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( solver, @@ -102,7 +107,13 @@ def __init__( verbose, ) - def fit(self, X, t, y, ipw): + def fit( + self, + X: Union[ndarray, DataFrame], + t: Union[Series, ndarray], + y: Union[Series, ndarray], + ipw: Union[Series, ndarray], + ) -> "FlowOPT_IPW": """ Fit the FlowOPT_IPW model to the given training data. @@ -179,7 +190,7 @@ def fit(self, X, t, y, ipw): # Return the classifier return self - def predict(self, X): + def predict(self, X: Union[ndarray, DataFrame]) -> ndarray: """ Predict optimal treatments for samples in X using the fitted FlowOPT_IPW model. @@ -297,11 +308,11 @@ class FlowOPT_DM(FlowOPTMultipleSink): def __init__( self, - solver, - depth=1, - time_limit=60, - num_threads=None, - verbose=False, + solver: str, + depth: int = 1, + time_limit: int = 60, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__( solver, @@ -311,7 +322,7 @@ def __init__( verbose, ) - def _define_objective(self): + def _define_objective(self) -> None: # define objective function obj = self._solver.lin_expr(0) for i in self._datapoints: @@ -323,7 +334,13 @@ def _define_objective(self): self._solver.set_objective(obj, ODTL.MAXIMIZE) - def fit(self, X, t, y, y_hat): + def fit( + self, + X: Union[ndarray, DataFrame], + t: Union[Series, ndarray], + y: Union[Series, ndarray], + y_hat: Union[ndarray, DataFrame], + ) -> "FlowOPT_DM": """ Fit the FlowOPT_DM model to the given training data. @@ -401,7 +418,7 @@ def fit(self, X, t, y, y_hat): # Return the classifier return self - def predict(self, X): + def predict(self, X: Union[ndarray, DataFrame]) -> ndarray: """ Predict optimal treatments for samples in X using the fitted FlowOPT_DM model. @@ -520,11 +537,23 @@ class FlowOPT_DR(FlowOPTMultipleSink): """ def __init__( - self, solver, depth=1, time_limit=60, num_threads=None, verbose=False + self, + solver: str, + depth: int = 1, + time_limit: int = 60, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: super().__init__(solver, depth, time_limit, num_threads, verbose) - def fit(self, X, t, y, ipw, y_hat): + def fit( + self, + X: Union[ndarray, DataFrame], + t: Union[Series, ndarray], + y: Union[Series, ndarray], + ipw: Union[Series, ndarray], + y_hat: Union[ndarray, DataFrame], + ) -> "FlowOPT_DR": """ Fit the FlowOPT_DR model to the given training data. @@ -607,7 +636,7 @@ def fit(self, X, t, y, ipw, y_hat): # Return the classifier return self - def predict(self, X): + def predict(self, X: Union[ndarray, DataFrame]) -> ndarray: """ Predict optimal treatments for samples in X using the fitted FlowOPT_DR model. @@ -662,7 +691,7 @@ def predict(self, X): return self._make_prediction(X) - def _define_objective(self): + def _define_objective(self) -> None: # define objective function obj = self._solver.lin_expr(0) for i in self._datapoints: diff --git a/odtlearn/flow_opt_ms.py b/odtlearn/flow_opt_ms.py index e147d39..e6d175a 100644 --- a/odtlearn/flow_opt_ms.py +++ b/odtlearn/flow_opt_ms.py @@ -99,11 +99,11 @@ class FlowOPTMultipleSink(OptimalPrescriptiveTree): def __init__( self, - solver, - depth, - time_limit, - num_threads, - verbose, + solver: str, + depth: int, + time_limit: int, + num_threads: None, + verbose: bool, ) -> None: super().__init__( @@ -114,7 +114,7 @@ def __init__( verbose, ) - def _tree_struc_variables(self): + def _tree_struc_variables(self) -> None: self._b = self._solver.add_vars( self._tree.Nodes, self._X_col_labels, vtype=ODTL.BINARY, name="b" ) @@ -129,7 +129,7 @@ def _tree_struc_variables(self): name="w", ) - def _flow_variables(self): + def _flow_variables(self) -> None: self._zeta = self._solver.add_vars( self._datapoints, self._tree.Nodes + self._tree.Leaves, @@ -146,11 +146,11 @@ def _flow_variables(self): name="z", ) - def _define_variables(self): + def _define_variables(self) -> None: self._tree_struc_variables() self._flow_variables() - def _tree_structure_constraints(self): + def _tree_structure_constraints(self) -> None: # sum(b[n,f], f) + p[n] + sum(p[m], m in A(n)) = 1 forall n in Nodes self._solver.add_constrs( ( @@ -181,7 +181,7 @@ def _tree_structure_constraints(self): for n in self._tree.Nodes + self._tree.Leaves ) - def _flow_constraints(self): + def _flow_constraints(self) -> None: # z[i,n] = z[i,l(n)] + z[i,r(n)] + zeta[i,n] forall i, n in Nodes for n in self._tree.Nodes: n_left = int(self._tree.get_left_children(n)) @@ -205,7 +205,7 @@ def _flow_constraints(self): for i in self._datapoints ) - def _arc_constraints(self): + def _arc_constraints(self) -> None: # z[i,l(n)] <= sum(b[n,f], f if x[i,f]<=0) forall i, n in Nodes for i in self._datapoints: self._solver.add_constrs( @@ -243,7 +243,7 @@ def _arc_constraints(self): self._solver.add_constrs(self._z[i, 1] == 1 for i in self._datapoints) - def _define_constraints(self): + def _define_constraints(self) -> None: self._tree_structure_constraints() self._flow_constraints() self._arc_constraints() diff --git a/odtlearn/flow_opt_ss.py b/odtlearn/flow_opt_ss.py index e27b2bd..b389cab 100644 --- a/odtlearn/flow_opt_ss.py +++ b/odtlearn/flow_opt_ss.py @@ -81,11 +81,11 @@ class FlowOPTSingleSink(OptimalPrescriptiveTree): def __init__( self, - solver, - depth, - time_limit, - num_threads, - verbose, + solver: str, + depth: int, + time_limit: int, + num_threads: None, + verbose: bool, ) -> None: super().__init__( solver, @@ -95,7 +95,7 @@ def __init__( verbose, ) - def _tree_struc_variables(self): + def _tree_struc_variables(self) -> None: self._b = self._solver.add_vars( self._tree.Nodes, self._X_col_labels, vtype=ODTL.BINARY, name="b" ) @@ -110,7 +110,7 @@ def _tree_struc_variables(self): name="w", ) - def _flow_variables(self): + def _flow_variables(self) -> None: self._zeta = self._solver.add_vars( self._datapoints, self._tree.Nodes + self._tree.Leaves, @@ -126,11 +126,11 @@ def _flow_variables(self): name="z", ) - def _define_variables(self): + def _define_variables(self) -> None: self._tree_struc_variables() self._flow_variables() - def _tree_structure_constraints(self): + def _tree_structure_constraints(self) -> None: # sum(b[n,f], f) + p[n] + sum(p[m], m in A(n)) = 1 forall n in Nodes self._solver.add_constrs( ( @@ -161,7 +161,7 @@ def _tree_structure_constraints(self): for n in self._tree.Nodes + self._tree.Leaves ) - def _flow_constraints(self): + def _flow_constraints(self) -> None: # z[i,n] = z[i,l(n)] + z[i,r(n)] + zeta[i,n] forall i, n in Nodes for n in self._tree.Nodes: n_left = int(self._tree.get_left_children(n)) @@ -179,7 +179,7 @@ def _flow_constraints(self): self._zeta[i, n] == self._z[i, n] for i in self._datapoints ) - def _arc_constraints(self): + def _arc_constraints(self) -> None: # z[i,l(n)] <= sum(b[n,f], f if x[i,f]<=0) forall i, n in Nodes for i in self._datapoints: self._solver.add_constrs( @@ -214,12 +214,12 @@ def _arc_constraints(self): self._zeta[i, n] <= self._w[n, self._t[i]] for i in self._datapoints ) - def _define_constraints(self): + def _define_constraints(self) -> None: self._tree_structure_constraints() self._flow_constraints() self._arc_constraints() - def _define_objective(self): + def _define_objective(self) -> None: # define objective function obj = self._solver.lin_expr(0) for i in self._datapoints: diff --git a/odtlearn/opt_ct.py b/odtlearn/opt_ct.py index 3e889b9..a539c59 100644 --- a/odtlearn/opt_ct.py +++ b/odtlearn/opt_ct.py @@ -1,5 +1,10 @@ +from typing import Dict, Optional, Tuple, Union + import numpy as np import pandas as pd +from numpy import int64, ndarray, str_ +from pandas.core.frame import DataFrame +from pandas.core.series import Series from sklearn.utils.validation import check_is_fitted from odtlearn.opt_dt import OptimalDecisionTree @@ -53,15 +58,17 @@ class to learn optimal classification trees. It formulates the problem as a mixe def __init__( self, - solver, - depth, - time_limit, - num_threads, - verbose, + solver: str, + depth: int, + time_limit: int, + num_threads: Union[None, int], + verbose: bool, ) -> None: super().__init__(solver, depth, time_limit, num_threads, verbose) - def _extract_metadata(self, X, y): + def _extract_metadata( + self, X: Union[ndarray, DataFrame], y: Union[ndarray, Series] + ) -> None: """A function for extracting metadata from the inputs before converting them into numpy arrays to work with the sklearn API @@ -81,12 +88,23 @@ def _extract_metadata(self, X, y): self._datapoints = np.arange(0, self._X.shape[0]) if isinstance(y, (pd.Series, pd.DataFrame)): - self._y = y.values.squeeze() + y_val = y.values + self._y = y_val.squeeze() else: self._y = y self._labels = np.unique(self._y) - def _get_node_status(self, b, w, p, n, feature_names=None): + def _get_node_status( + self, + b: Dict[Tuple[int, str_], float], + w: Dict[Tuple[int, int64], float], + p: Dict[int, float], + n: Union[int, int64], + feature_names: Optional[ndarray] = None, + ) -> Union[ + Tuple[bool, bool, None, int, bool, int64], + Tuple[bool, bool, str_, int, bool, None], + ]: """ This function give the status of a given node in a tree. By status we mean whether the node 1- is pruned? i.e., we have made a prediction at one of its ancestors @@ -150,7 +168,7 @@ def _get_node_status(self, b, w, p, n, feature_names=None): branching = True return pruned, branching, selected_feature, cutoff, leaf, value - def _make_prediction(self, X): + def _make_prediction(self, X: ndarray) -> ndarray: prediction = [] for i in range(X.shape[0]): current = 1 @@ -186,7 +204,7 @@ def _make_prediction(self, X): current = self._tree.get_left_children(current) return np.array(prediction) - def print_tree(self): + def print_tree(self) -> None: """ Print a text representation of the fitted tree. diff --git a/odtlearn/opt_dt.py b/odtlearn/opt_dt.py index f140bd5..e02a281 100644 --- a/odtlearn/opt_dt.py +++ b/odtlearn/opt_dt.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod +from typing import Union import mip +from sklearn.utils.validation import check_is_fitted from odtlearn.utils.solver import Solver from odtlearn.utils.Tree import _Tree @@ -70,7 +72,12 @@ class OptimalDecisionTree(ABC): """ def __init__( - self, solver, depth=1, time_limit=60, num_threads=None, verbose=False + self, + solver: str, + depth: int = 1, + time_limit: int = 60, + num_threads: Union[None, int] = None, + verbose: bool = False, ) -> None: self.solver_name = solver @@ -88,7 +95,7 @@ def __init__( self._solver.model.threads = num_threads self._solver.model.max_seconds = time_limit - def __repr__(self): + def __repr__(self) -> str: rep = ( f"{type(self).__name__}(solver={self.solver_name}," f"depth={self._depth}," @@ -110,7 +117,7 @@ def _define_constraints(self): def _define_objective(self): pass - def _create_main_problem(self): + def _create_main_problem(self) -> None: """ This function creates and return a model based on the variables, constraints, and objective defined within a subclass @@ -192,3 +199,97 @@ def store_search_progress_log(self) -> bool: @store_search_progress_log.setter def store_search_progress_log(self, store: bool): self._solver.model.store_search_progress_log = store + + def plot_search_progress( + self, + ax=None, + title="Search Progress", + xlabel="Time (s)", + ylabel="Objective Bound", + lw=2, + alpha=0.8, + legend_loc="best", + legend_fontsize=None, + grid=True, + color_ub="b", + color_lb="r", + log_scale=False, + ) -> None: + """ + Plots the search progress log showing the lower and upper objective bounds over time. + + Parameters + ---------- + ax : matplotlib axis, default=None + Axes to plot to. If None, use current axis. Any previous content is cleared. + title : str, optional, default: 'Search Progress' + The title of the plot. + xlabel : str, optional, default: 'Time (s)' + The label for the x-axis. + ylabel : str, optional, default: 'Objective Bound' + The label for the y-axis. + lw : float, optional, default: 2 + The line width of the bound lines. + alpha : float, optional, default: 0.8 + The alpha blending value between 0 (transparent) and 1 (opaque). + legend_loc : str, optional, default: 'best' + The location of the legend. + legend_fontsize : int or str, optional + The font size for the legend labels. + grid : bool, optional, default: True + Whether to show the grid lines. + color_ub : color str or tuple of floats, optional, default: 'b' + The color to use for the upper bound line. + color_lb : color str or tuple of floats, optional, default: 'r' + The color to use for the lower bound line. + log_scale : bool, optional, default: False + Whether to use a log scale for the y-axis. + + Raises + ------ + NotFittedError + If the model has not been fitted yet. + AttributeError + If the search progress log was not recorded during fitting. + + Notes + ----- + The search progress log must be enabled prior to fitting by setting + `store_search_progress_log` to True. + """ + import matplotlib.pyplot as plt + + # Check if model has been fit + check_is_fitted(self, ["b_value", "w_value"]) + + # Check if search progress log exists + if len(self.search_progress_log.log) == 0: + raise AttributeError( + "No search progress log found. Make sure to set " + "'store_search_progress_log=True' before fitting." + ) + + # Extract times and bounds from log + times, bounds = zip(*self.search_progress_log.log) + lb, ub = zip(*bounds) + + # Create plot + if ax is None: + ax = plt.gca() + ax.clear() + ax.plot(times, lb, label="Lower Bound", lw=lw, alpha=alpha, color=color_lb) + ax.plot(times, ub, label="Upper Bound", lw=lw, alpha=alpha, color=color_ub) + + ax.set_xlabel(xlabel) + ax.set_ylabel(ylabel) + ax.set_title(title) + + if log_scale: + ax.set_yscale("log") + + ax.legend(loc=legend_loc, fontsize=legend_fontsize) + + if grid: + ax.grid() + + return ax diff --git a/odtlearn/opt_pt.py b/odtlearn/opt_pt.py index beca3e9..9bbcc54 100644 --- a/odtlearn/opt_pt.py +++ b/odtlearn/opt_pt.py @@ -4,6 +4,10 @@ from odtlearn.opt_dt import OptimalDecisionTree from odtlearn.utils.TreePlotter import MPLPlotter +from numpy import int64, ndarray, str_ +from pandas.core.frame import DataFrame +from pandas.core.series import Series +from typing import Dict, Tuple, Union class OptimalPrescriptiveTree(OptimalDecisionTree): @@ -69,15 +73,20 @@ class OptimalPrescriptiveTree(OptimalDecisionTree): def __init__( self, - solver, - depth, - time_limit, - num_threads, - verbose, + solver: str, + depth: int, + time_limit: int, + num_threads: None, + verbose: bool, ) -> None: super().__init__(solver, depth, time_limit, num_threads, verbose) - def _extract_metadata(self, X, y, t): + def _extract_metadata( + self, + X: Union[DataFrame, ndarray], + y: Union[Series, ndarray], + t: Union[Series, ndarray], + ) -> None: """A function for extracting metadata from the inputs before converting them into numpy arrays to work with the sklearn API @@ -105,7 +114,18 @@ def _extract_metadata(self, X, y, t): self._t = t self._treatments = np.unique(t) - def _get_node_status(self, b, w, p, n, feature_names=None): + def _get_node_status( + self, + b: Union[Dict[Tuple[int, str], float], Dict[Tuple[int, str_], float]], + w: Dict[Tuple[int, int64], float], + p: Dict[int, float], + n: int, + feature_names: None = None, + ) -> Union[ + Tuple[bool, bool, str, int, bool, None], + Tuple[bool, bool, None, int, bool, int64], + Tuple[bool, bool, str_, int, bool, None], + ]: """ This function give the status of a given node in a tree. By status we mean whether the node 1- is pruned? i.e., we have made a prediction at one of its ancestors @@ -169,7 +189,7 @@ def _get_node_status(self, b, w, p, n, feature_names=None): branching = True return pruned, branching, selected_feature, cutoff, leaf, value - def _make_prediction(self, X): + def _make_prediction(self, X: ndarray) -> ndarray: prediction = [] for i in range(X.shape[0]): current = 1 diff --git a/odtlearn/utils/Tree.py b/odtlearn/utils/Tree.py index f8cf9bc..8a57b62 100644 --- a/odtlearn/utils/Tree.py +++ b/odtlearn/utils/Tree.py @@ -1,4 +1,6 @@ import numpy as np +from numpy import int64 +from typing import Any, List, Union class _Tree: @@ -12,19 +14,19 @@ class _Tree: In this class we assume that we have a complete binary tree; we only receive the depth from the user """ - def __init__(self, d): + def __init__(self, d: int) -> None: self.depth = d self.Nodes = [i for i in range(1, np.power(2, d))] self.Leaves = [i for i in range(np.power(2, d), np.power(2, d + 1))] self.total_nodes = len(self.Nodes) + len(self.Leaves) - def get_left_children(self, n): + def get_left_children(self, n: int) -> int: if n in self.Nodes: return 2 * n else: raise IndexError("Node index not found in tree") - def get_right_children(self, n): + def get_right_children(self, n: int) -> int: if n in self.Nodes: return 2 * n + 1 else: @@ -36,7 +38,7 @@ def get_parent(self, n): else: raise IndexError("Node index not found in tree") - def get_ancestors(self, n): + def get_ancestors(self, n: Union[int, int64]) -> List[Union[Any, int]]: ancestors = [] if (n in self.Nodes) or (n in self.Leaves): current = n diff --git a/odtlearn/utils/binarize.py b/odtlearn/utils/binarize.py index f72f168..8f5feea 100644 --- a/odtlearn/utils/binarize.py +++ b/odtlearn/utils/binarize.py @@ -2,6 +2,8 @@ import pandas as pd from sklearn.base import BaseEstimator, TransformerMixin from sklearn.preprocessing import KBinsDiscretizer, OneHotEncoder +from pandas.core.frame import DataFrame +from typing import List, Optional class Binarizer(BaseEstimator, TransformerMixin): @@ -53,12 +55,12 @@ class Binarizer(BaseEstimator, TransformerMixin): def __init__( self, - categorical_cols=None, - integer_cols=None, - real_cols=None, - n_bins=4, - bin_strategy="uniform", - ): + categorical_cols: Optional[List[str]] = None, + integer_cols: Optional[List[str]] = None, + real_cols: Optional[List[str]] = None, + n_bins: int = 4, + bin_strategy: str = "uniform", + ) -> None: assert any( [x is not None for x in [categorical_cols, integer_cols, real_cols]] ), ( @@ -91,7 +93,7 @@ def __init__( self.encoders_ = {} self.column_names_ = None - def fit(self, X, y=None): + def fit(self, X: DataFrame, y: None = None) -> "Binarizer": """ Fit the Binarizer to the input data. @@ -137,7 +139,7 @@ def fit(self, X, y=None): return self - def transform(self, X): + def transform(self, X: DataFrame) -> DataFrame: """ Transform the input data using the fitted Binarizer. @@ -201,7 +203,7 @@ def transform(self, X): return result[self.column_names_] - def _get_feature_names_out(self): + def _get_feature_names_out(self) -> List[str]: """Get feature names for the binarized columns.""" feature_names = [] diff --git a/odtlearn/utils/callback_helpers.py b/odtlearn/utils/callback_helpers.py index 213bc83..15fbc02 100644 --- a/odtlearn/utils/callback_helpers.py +++ b/odtlearn/utils/callback_helpers.py @@ -1,11 +1,17 @@ import copy +from typing import Any, List, Union import numpy as np +from mip.entities import LinExpr + +from odtlearn.utils.solver import Solver # helper functions for BenderOCT callback -def get_left_exp_integer(solver, main_grb_obj, n, i): +def get_left_exp_integer( + solver: Solver, main_grb_obj: "BendersOCT", n: int, i: int # noqa: F821 +) -> LinExpr: """ Get the expression for the left branch constraint in the Benders' subproblem. @@ -34,7 +40,9 @@ def get_left_exp_integer(solver, main_grb_obj, n, i): return lhs -def get_right_exp_integer(solver, main_grb_obj, n, i): +def get_right_exp_integer( + solver: Solver, main_grb_obj: "BendersOCT", n: int, i: int # noqa: F821 +) -> LinExpr: """ Get the expression for the right branch constraint in the Benders' subproblem. @@ -63,7 +71,9 @@ def get_right_exp_integer(solver, main_grb_obj, n, i): return lhs -def get_target_exp_integer(main_grb_obj, n, i): +def get_target_exp_integer( + main_grb_obj: "BendersOCT", n: int, i: int # noqa: F821 +) -> LinExpr: """ Get the expression for the target constraint in the Benders' subproblem. @@ -86,7 +96,14 @@ def get_target_exp_integer(main_grb_obj, n, i): return lhs -def get_cut_integer(solver, main_grb_obj, left, right, target, i): +def get_cut_integer( + solver: Solver, + main_grb_obj: "BendersOCT", # noqa: F821 + left: List[Union[Any, int]], + right: List[Union[Any, int]], + target: List[int], + i: int, +) -> LinExpr: """ Get the Benders' cut expression for the current subproblem. diff --git a/odtlearn/utils/callbacks.py b/odtlearn/utils/callbacks.py index 060e40a..3e76d3a 100644 --- a/odtlearn/utils/callbacks.py +++ b/odtlearn/utils/callbacks.py @@ -1,7 +1,10 @@ import copy import heapq +from typing import Any, Dict, List, Tuple, Union from mip import ConstrsGenerator, Model +from numpy import int64, str_ +from pandas.core.frame import DataFrame from odtlearn.utils.callback_helpers import ( get_all_terminal_paths, @@ -10,9 +13,20 @@ get_nominal_path, shortest_path_solver, ) - - -def benders_subproblem(main_model_obj, b, p, w, i): +from odtlearn.utils.solver import Solver + + +def benders_subproblem( + main_model_obj: "BendersOCT", # noqa: F821 + b: Union[Dict[Tuple[int, str_], float], Dict[Tuple[int, str], float]], + p: Dict[int, float], + w: Dict[Tuple[int, int64], float], + i: int, +) -> Union[ + Tuple[int, List[int], List[Any], List[int]], + Tuple[int, List[Any], List[int], List[int]], + Tuple[int, List[int], List[int], List[int]], +]: """ Solve the Benders' subproblem for a given datapoint. @@ -89,7 +103,9 @@ class BendersCallback(ConstrsGenerator): """ - def __init__(self, X, obj, solver, **kwargs): + def __init__( + self, X: DataFrame, obj: "BendersOCT", solver: Solver, **kwargs # noqa: F821 + ) -> None: self.X = X self.obj = obj self.solver = solver @@ -98,7 +114,7 @@ def __init__(self, X, obj, solver, **kwargs): self.b = kwargs.get("b") self.w = kwargs.get("w") - def generate_constrs(self, model: Model, depth: int = 0, npass: int = 0): + def generate_constrs(self, model: Model, depth: int = 0, npass: int = 0) -> None: """ Generate Benders' cuts at the current node in the branch-and-bound tree. diff --git a/odtlearn/utils/validation.py b/odtlearn/utils/validation.py index ec58b8b..fdcf21b 100644 --- a/odtlearn/utils/validation.py +++ b/odtlearn/utils/validation.py @@ -1,5 +1,11 @@ +from typing import Union + import numpy as np import pandas as pd +from numpy import ndarray +from pandas.core.frame import DataFrame +from pandas.core.indexes.base import Index +from pandas.core.series import Series from sklearn.utils.validation import ( _assert_all_finite, check_array, @@ -8,7 +14,7 @@ ) -def check_ipw(X, ipw): +def check_ipw(X: ndarray, ipw: Union[ndarray, Series]) -> ndarray: """ Check and validate inverse probability weights (IPW). @@ -56,7 +62,9 @@ def check_ipw(X, ipw): return ipw -def check_y_hat(X, treatments, y_hat): +def check_y_hat( + X: ndarray, treatments: ndarray, y_hat: Union[DataFrame, ndarray] +) -> ndarray: """ Check and validate counterfactual predictions (y_hat). @@ -112,7 +120,7 @@ def check_y_hat(X, treatments, y_hat): return y_hat -def check_y(X, y): +def check_y(X: ndarray, y: Union[ndarray, Series]) -> ndarray: """ Check and validate target values (y). @@ -153,7 +161,9 @@ def check_y(X, y): return y -def check_columns_match(original_columns, new_data): +def check_columns_match( + original_columns: Union[ndarray, Index], new_data: Union[ndarray, DataFrame] +) -> None: """ Check if the columns in new_data match the original_columns. @@ -211,7 +221,7 @@ def check_columns_match(original_columns, new_data): ), f"Fit data has {len(original_columns)} columns but new data has {new_data.shape[1]} columns." -def check_binary(df): +def check_binary(df: Union[DataFrame, ndarray]) -> None: """ Check if all values in the DataFrame are binary (0 or 1).