diff --git a/.gitignore b/.gitignore index b39dd5438..351f7898f 100644 --- a/.gitignore +++ b/.gitignore @@ -234,8 +234,6 @@ docs/_build/ .pybuilder/ target/ -# Jupyter Notebook -.ipynb_checkpoints # IPython profile_default/ @@ -302,3 +300,6 @@ gurobi.log /documentation_builder/test*\.* /.benchmarks /.testmondata + +# sonarlint +.idea/sonarlint* diff --git a/release-notes/next-release.md b/release-notes/next-release.md index 2ddd48ce4..a3c099474 100644 --- a/release-notes/next-release.md +++ b/release-notes/next-release.md @@ -6,6 +6,8 @@ ## Other +* Resolve `flake8` issues and add missing type annotations and docstrings in `src/cobra/io` and `tests/test_io` (#1212). + ## Deprecated features ## Backwards incompatible changes diff --git a/src/cobra/core/__init__.py b/src/cobra/core/__init__.py index bf171e936..12e082517 100644 --- a/src/cobra/core/__init__.py +++ b/src/cobra/core/__init__.py @@ -6,5 +6,5 @@ from cobra.core.object import Object from cobra.core.reaction import Reaction from cobra.core.group import Group -from cobra.core.solution import Solution, LegacySolution, get_solution +from cobra.core.solution import Solution, get_solution from cobra.core.species import Species diff --git a/src/cobra/core/object.py b/src/cobra/core/object.py index 78d092d05..88e489b6b 100644 --- a/src/cobra/core/object.py +++ b/src/cobra/core/object.py @@ -2,6 +2,8 @@ from typing import Optional +from cobra.util import resettable + class Object: """Defines common behavior of object in cobra.core.""" @@ -81,6 +83,7 @@ def annotation(self) -> dict: return self._annotation @annotation.setter + @resettable def annotation(self, annotation): """Set annotation. diff --git a/src/cobra/core/singleton.py b/src/cobra/core/singleton.py index 158262fed..2ca183ec5 100644 --- a/src/cobra/core/singleton.py +++ b/src/cobra/core/singleton.py @@ -1,9 +1,5 @@ -# -*- coding: utf-8 -*- - """Define the singleton meta class.""" -from __future__ import absolute_import - class Singleton(type): """Implementation of the singleton pattern as a meta class.""" diff --git a/src/cobra/core/solution.py b/src/cobra/core/solution.py index a24a1a547..77a729417 100644 --- a/src/cobra/core/solution.py +++ b/src/cobra/core/solution.py @@ -1,33 +1,42 @@ -# -*- coding: utf-8 -*- - """Provide unified interfaces to optimization solutions.""" -from __future__ import absolute_import - import logging -from builtins import object, super -from warnings import warn +from typing import TYPE_CHECKING, Iterable, Optional -from numpy import empty, nan +import numpy as np +import pandas as pd from optlang.interface import OPTIMAL -from pandas import DataFrame, Series, option_context -from cobra.util.solver import check_solver_status +from ..util.solver import check_solver_status + + +if TYPE_CHECKING: + from cobra import Metabolite, Model, Reaction -__all__ = ("Solution", "LegacySolution", "get_solution") +__all__ = ("Solution", "get_solution") -LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) -class Solution(object): +class Solution: """ A unified interface to a `cobra.Model` optimization solution. - Notes - ----- - Solution is meant to be constructed by `get_solution` please look at that - function to fully understand the `Solution` class. + Parameters + ---------- + objective_value : float + The (optimal) value for the objective function. + status : str + The solver status related to the solution. + fluxes : pandas.Series + Contains the reaction fluxes (primal values of variables). + reduced_costs : pandas.Series + Contains reaction reduced costs (dual values of variables) + (default None). + shadow_prices : pandas.Series + Contains metabolite shadow prices (dual values of constraints) + (default None). Attributes ---------- @@ -41,173 +50,97 @@ class Solution(object): Contains reaction reduced costs (dual values of variables). shadow_prices : pandas.Series Contains metabolite shadow prices (dual values of constraints). + + Notes + ----- + Solution is meant to be constructed by `get_solution` please look at that + function to fully understand the `Solution` class. + """ def __init__( self, - objective_value, - status, - fluxes, - reduced_costs=None, - shadow_prices=None, - **kwargs - ): + objective_value: float, + status: str, + fluxes: pd.Series, + reduced_costs: Optional[pd.Series] = None, + shadow_prices: Optional[pd.Series] = None, + **kwargs, + ) -> None: """ Initialize a `Solution` from its components. - Parameters - ---------- - objective_value : float - The (optimal) value for the objective function. - status : str - The solver status related to the solution. - fluxes : pandas.Series - Contains the reaction fluxes (primal values of variables). - reduced_costs : pandas.Series - Contains reaction reduced costs (dual values of variables). - shadow_prices : pandas.Series - Contains metabolite shadow prices (dual values of constraints). + Other Parameters + ---------------- + kwargs : + Further keyword arguments are passed on to the parent class. + """ - super(Solution, self).__init__(**kwargs) + super().__init__(**kwargs) self.objective_value = objective_value self.status = status self.fluxes = fluxes self.reduced_costs = reduced_costs self.shadow_prices = shadow_prices - def __repr__(self): - """String representation of the solution instance.""" + def __repr__(self) -> str: + """Return a string representation of the solution instance.""" if self.status != OPTIMAL: - return "".format(self.status, id(self)) - return "".format(self.objective_value, id(self)) + return f"" + return f"" - def _repr_html_(self): + def _repr_html_(self) -> str: + """Return a rich HTML representation of the solution.""" if self.status == OPTIMAL: - with option_context("display.max_rows", 10): + with pd.option_context("display.max_rows", 10): html = ( "Optimal solution with objective " - "value {:.3f}
{}".format( - self.objective_value, self.to_frame()._repr_html_() - ) + f"value {self.objective_value:.3f}
" + f"{self.to_frame()._repr_html_()}" ) else: - html = "{} solution".format(self.status) + html = f"{self.status} solution" return html - def __getitem__(self, reaction_id): + def __getitem__(self, reaction_id: str) -> float: """ Return the flux of a reaction. Parameters ---------- - reaction : str + reaction_id : str A model reaction ID. - """ - return self.fluxes[reaction_id] - - get_primal_by_id = __getitem__ - - def to_frame(self): - """Return the fluxes and reduced costs as a data frame""" - return DataFrame({"fluxes": self.fluxes, "reduced_costs": self.reduced_costs}) - - -class LegacySolution(object): - """ - Legacy support for an interface to a `cobra.Model` optimization solution. - Attributes - ---------- - f : float - The objective value - solver : str - A string indicating which solver package was used. - x : iterable - List or Array of the fluxes (primal values). - x_dict : dict - A dictionary of reaction IDs that maps to the respective primal values. - y : iterable - List or Array of the dual values. - y_dict : dict - A dictionary of reaction IDs that maps to the respective dual values. - - Warning - ------- - The LegacySolution class and its interface is deprecated. - """ - - def __init__( - self, - f, - x=None, - x_dict=None, - y=None, - y_dict=None, - solver=None, - the_time=0, - status="NA", - **kwargs - ): - """ - Initialize a `LegacySolution` from an objective value. + Returns + ------- + float + The flux of the reaction with ID `reaction_id`. - Parameters - ---------- - f : float - Objective value. - solver : str, optional - A string indicating which solver package was used. - x : iterable, optional - List or Array of the fluxes (primal values). - x_dict : dict, optional - A dictionary of reaction IDs that maps to the respective primal - values. - y : iterable, optional - List or Array of the dual values. - y_dict : dict, optional - A dictionary of reaction IDs that maps to the respective dual - values. - the_time : int, optional - status : str, optional - - .. warning :: deprecated """ - super(LegacySolution, self).__init__(**kwargs) - self.solver = solver - self.f = f - self.x = x - self.x_dict = x_dict - self.status = status - self.y = y - self.y_dict = y_dict - - def __repr__(self): - """String representation of the solution instance.""" - if self.status != "optimal": - return "".format(self.status, id(self)) - return "".format(self.f, id(self)) + return self.fluxes[reaction_id] - def __getitem__(self, reaction_id): - """ - Return the flux of a reaction. + get_primal_by_id = __getitem__ - Parameters - ---------- - reaction_id : str - A reaction ID. - """ - return self.x_dict[reaction_id] + def to_frame(self) -> pd.DataFrame: + """Return the fluxes and reduced costs as a pandas DataFrame. - def dress_results(self, model): - """ - Method could be intended as a decorator. + Returns + ------- + pandas.DataFrame + The fluxes and reduced cost. - .. warning :: deprecated """ - warn("unnecessary to call this deprecated function", DeprecationWarning) + return pd.DataFrame( + {"fluxes": self.fluxes, "reduced_costs": self.reduced_costs} + ) -def get_solution(model, reactions=None, metabolites=None, raise_error=False): +def get_solution( + model: "Model", + reactions: Optional[Iterable["Reaction"]] = None, + metabolites: Optional[Iterable["Metabolite"]] = None, + raise_error: bool = False, +) -> Solution: """ Generate a solution representation of the current solver state. @@ -216,22 +149,19 @@ def get_solution(model, reactions=None, metabolites=None, raise_error=False): model : cobra.Model The model whose reactions to retrieve values for. reactions : list, optional - An iterable of `cobra.Reaction` objects. Uses `model.reactions` by - default. + An iterable of `cobra.Reaction` objects. Uses `model.reactions` + if None (default None). metabolites : list, optional - An iterable of `cobra.Metabolite` objects. Uses `model.metabolites` by - default. + An iterable of `cobra.Metabolite` objects. Uses `model.metabolites` + if None (default None). raise_error : bool - If true, raise an OptimizationError if solver status is not optimal. + If True, raise an OptimizationError if solver status is not optimal + (default False). Returns ------- cobra.Solution - Note - ---- - This is only intended for the `optlang` solver interfaces and not the - legacy solvers. """ check_solver_status(model.solver.status, raise_error=raise_error) if reactions is None: @@ -239,14 +169,14 @@ def get_solution(model, reactions=None, metabolites=None, raise_error=False): if metabolites is None: metabolites = model.metabolites - rxn_index = list() - fluxes = empty(len(reactions)) - reduced = empty(len(reactions)) + rxn_index = [] + fluxes = np.empty(len(reactions)) + reduced = np.empty(len(reactions)) var_primals = model.solver.primal_values - shadow = empty(len(metabolites)) + shadow = np.empty(len(metabolites)) if model.solver.is_integer: - reduced.fill(nan) - shadow.fill(nan) + reduced.fill(np.nan) + shadow.fill(np.nan) for (i, rxn) in enumerate(reactions): rxn_index.append(rxn.id) fluxes[i] = var_primals[rxn.id] - var_primals[rxn.reverse_id] @@ -259,15 +189,15 @@ def get_solution(model, reactions=None, metabolites=None, raise_error=False): rxn_index.append(forward) fluxes[i] = var_primals[forward] - var_primals[reverse] reduced[i] = var_duals[forward] - var_duals[reverse] - met_index = list() + met_index = [] constr_duals = model.solver.shadow_prices for (i, met) in enumerate(metabolites): met_index.append(met.id) shadow[i] = constr_duals[met.id] return Solution( - model.solver.objective.value, - model.solver.status, - Series(index=rxn_index, data=fluxes, name="fluxes"), - Series(index=rxn_index, data=reduced, name="reduced_costs"), - Series(index=met_index, data=shadow, name="shadow_prices"), + objective_value=model.solver.objective.value, + status=model.solver.status, + fluxes=pd.Series(index=rxn_index, data=fluxes, name="fluxes"), + reduced_costs=pd.Series(index=rxn_index, data=reduced, name="reduced_costs"), + shadow_prices=pd.Series(index=met_index, data=shadow, name="shadow_prices"), ) diff --git a/src/cobra/core/species.py b/src/cobra/core/species.py index 68b095d9c..39749fcd0 100644 --- a/src/cobra/core/species.py +++ b/src/cobra/core/species.py @@ -2,7 +2,7 @@ from copy import deepcopy -from typing import TYPE_CHECKING, FrozenSet, Optional, Union +from typing import TYPE_CHECKING, FrozenSet, Optional from ..core.object import Object diff --git a/src/cobra/data/init.py b/src/cobra/data/__init__.py similarity index 100% rename from src/cobra/data/init.py rename to src/cobra/data/__init__.py diff --git a/src/cobra/io/mat.py b/src/cobra/io/mat.py index eaf3f9e43..73f52e417 100644 --- a/src/cobra/io/mat.py +++ b/src/cobra/io/mat.py @@ -698,8 +698,8 @@ def from_mat_struct( for old_field, new_field in zip(old_cobratoolbox_fields, new_cobratoolbox_fields): if old_field in m.dtype.names and new_field not in m.dtype.names: logger.warning( - f"This model seems to have {old_field} instead of {new_field} field. Will use " - f"{old_field} for what {new_field} represents." + f"This model seems to have {old_field} instead of {new_field} field. " + f"Will use {old_field} for what {new_field} represents." ) new_names = list(m.dtype.names) new_names[new_names.index(old_field)] = new_field diff --git a/src/cobra/io/web/cobrapy_repository.py b/src/cobra/io/web/cobrapy_repository.py index 0cb92a223..ab0726fa1 100644 --- a/src/cobra/io/web/cobrapy_repository.py +++ b/src/cobra/io/web/cobrapy_repository.py @@ -51,7 +51,7 @@ def get_sbml(self, model_id: str) -> bytes: A gzip-compressed, UTF-8 encoded SBML document. """ return ( - importlib_resources.files("cobra.data") + importlib_resources.files(cobra.data) .joinpath(f"{model_id}.xml.gz") .read_bytes() ) diff --git a/src/cobra/io/web/load.py b/src/cobra/io/web/load.py index d734fec4c..5244904ac 100644 --- a/src/cobra/io/web/load.py +++ b/src/cobra/io/web/load.py @@ -25,13 +25,16 @@ configuration = Configuration() +DEFAULT_REPOSITORIES = ( + Cobrapy(), + BiGGModels(), + BioModels(), +) + + def load_model( model_id: str, - repositories: Iterable[AbstractModelRepository] = ( - Cobrapy(), - BiGGModels(), - BioModels(), - ), + repositories: Iterable[AbstractModelRepository] = DEFAULT_REPOSITORIES, cache: bool = True, ) -> "Model": """ @@ -155,7 +158,7 @@ def _fetch_model( ) try: return repository.get_sbml(model_id=model_id) - except (FileNotFoundError, OSError): + except OSError: logger.debug( f"Model '{model_id} not found in the local " f"repository {repository.name}.'" diff --git a/src/cobra/util/__init__.py b/src/cobra/util/__init__.py index 7fb765993..e0c33a8ce 100644 --- a/src/cobra/util/__init__.py +++ b/src/cobra/util/__init__.py @@ -3,3 +3,4 @@ from cobra.util.solver import * from cobra.util.util import * from cobra.util.process_pool import * +from cobra.util.compare import * diff --git a/src/cobra/util/compare.py b/src/cobra/util/compare.py new file mode 100644 index 000000000..519a3bc91 --- /dev/null +++ b/src/cobra/util/compare.py @@ -0,0 +1,363 @@ +"""Comparing models, reactions, metabolites, genes and groups.""" + +from typing import ( + TYPE_CHECKING, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + TypeVar, + Union, +) + + +if TYPE_CHECKING: + from cobra.core import DictList, Group, Model, Object, Reaction + + TObject = TypeVar("TObject", bound=Object) + + +def dict_compare( + d1: Dict, d2: Dict, _dont_compare: Optional[set] = None +) -> Dict[str, Union[Set, Dict]]: + """Compare two dictionaries. + + This function will identify overlapping keys, added, removed keys between + dictonaries. If there are identical keys which will not have the same value, they + will be noted as 'modified'. + + Parameters + ---------- + d1: dict + Dictionary to compare. + d2: dict + Dictionary to compare. + _dont_compare: set + Keys that should not be compared. Optional. Default None (compare all keys). + + Returns + ------- + A dicitonary comprised of + "added" - set of attributes present in the first, but not the second + "removed" - set of attributes present in the second, but not the first + "same" - dict of keys prsent in both with the same values + "modified" - dict of keys present in both with different values. Each key will + have a tuple of values in both dictionaries given. + """ + if _dont_compare is None: + _dont_compare = set() + d1_keys = set(d1.keys()).difference(_dont_compare) + d2_keys = set(d2.keys()).difference(_dont_compare) + shared_keys = d1_keys.intersection(d2_keys) + added = d1_keys - d2_keys + removed = d2_keys - d1_keys + modified = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]} + same = {o for o in shared_keys if d1[o] == d2[o]} + return {"added": added, "removed": removed, "modified": modified, "same": same} + + +def compare_state( + obj1: "TObject", obj2: "TObject", ignore_keys: Optional[set] = None +) -> Tuple[bool, Dict]: + """Cmpare two cobra Objects (including subclasses). + + Useful for Metaboite, and Gene Comparison. + Not useful for comparing GPRs(). Use the equality in GPRs() directly. + For Reaction and Group, use the specific functions which do some pre-processing. + + Parameters + ---------- + obj1: Object, Metabolite, Gene + obj2: Object, Metabolite, Gene + ignore_keys: Set, optional + A set of keys to ignore. Defuault None (empty set - all keys will be compared). + + Returns + ------- + Tuple - bool, Dict + A tuple of a boolean (are the two objects equivalent or different) and + a dictionary specifying how they differed. + """ + _is_equivalent = True + if ignore_keys is None: + ignore_keys = set() + _is_equivalent = True + state1 = obj1.__getstate__() + state2 = obj2.__getstate__() + _comparison = dict_compare(state1, state2, ignore_keys) + if _comparison["added"] or _comparison["removed"] or _comparison["modified"]: + _is_equivalent = False + return _is_equivalent, _comparison + + +def compare_reaction_state( + rxn1: "Reaction", rxn2: "Reaction", ignore_keys: Optional[set] = None +) -> Tuple[bool, Dict]: + """Compare two cobra Reactions. + + In order to avoid recursion and disagreement on memory address + genes are transformed to gene.ids + metabolites are transformed to metabolite.ids + + Parameters + ---------- + rxn1: Reaction + rxn2: Reaction + ignore_keys: Set, optional + A set of keys to ignore. Defuault None (empty set - all keys will be compared). + + Returns + ------- + Tuple - bool, Dict + A tuple of a boolean (are the two objects equivalent or different) and a + dictionary specifying how they differed. + """ + if ignore_keys is None: + ignore_keys = set() + _is_equivalent = True + state1 = rxn1.__getstate__() + state1["_metabolites"] = {met.id: stoic for met, stoic in rxn1.metabolites.items()} + state1["_genes"] = {gene.id for gene in rxn1.genes} + state2 = rxn2.__getstate__() + state2["_metabolites"] = {met.id: stoic for met, stoic in rxn2.metabolites.items()} + state2["_genes"] = {gene.id for gene in rxn2.genes} + _comparison = dict_compare(state1, state2, ignore_keys) + if _comparison["added"] or _comparison["removed"] or _comparison["modified"]: + _is_equivalent = False + return _is_equivalent, _comparison + + +def compare_group_state( + group1: "Group", group2: "Group", ignore_keys: Optional[set] = None +) -> Tuple[bool, Dict]: + """Compare two cobra Groups. + + Members are transformed to a list of reaction ids in order to avoid differences in + memory address leading to false positives. + + Parameters + ---------- + group1: Group + group2: Group + ignore_keys: Set, optional + A set of keys to ignore. Defuault None (empty set - all keys will be compared). + + Returns + ------- + Tuple - bool, Dict + A tuple of a boolean (are the two objects equivalent or different) and a + dictionary specifying how they differed. + """ + _is_equivalent = True + state1 = group1.__getstate__() + state2 = group2.__getstate__() + state1["_members"] = group1.members.list_attr("id") + state2["_members"] = group2.members.list_attr("id") + _comparison = dict_compare(state1, state2, ignore_keys) + if _comparison["added"] or _comparison["removed"] or _comparison["modified"]: + _is_equivalent = False + return _is_equivalent, _comparison + + +def compare_dictlists( + dictlist1: "DictList", + dictlist2: "DictList", + ignore_keys: Optional[set] = None, + comparison_function: Optional[Callable] = None, +) -> Tuple[bool, Dict, List]: + """Compare dictlist of objects. Useful when comparing models. + + Will check whether there are objects in dictlist1 that aren't present in dictlist2, + and vice versa. + Objects that appear in both dictlists will be compared using the comparison + function, which allows different functions for reaction or group. The default is + None, which will result in simple compare_state() used as the comparison function. + + Parameters + ---------- + ignore_keys: set, optional + What keys should be ignored. Defualt None. + dictlist1: cobra.DictList + The first dictlist to compare + dictlist2: cobra.DictList + comparison_function: Callable + A function to use for comparing the objects in the dictlists. + + Returns + ------- + Tuple: bool, Dict, List + A tuple of a boolean (are the two dictlists equivalent or different) and a + dictionary specifying how they differed. It also returns a list of ids that + were present in both dictlists, but different. + + See Also + -------- + compare_state() + compare_reaction_state() + compare_group_state() + """ + if comparison_function is None: + comparison_function = compare_state + _is_equivalent = True + comparison = {} + diff_objs = [] + ids_model1 = set(dictlist1.list_attr("id")) + ids_model2 = set(dictlist2.list_attr("id")) + if ids_model1 - ids_model2: + comparison["added"] = ids_model1 - ids_model2 + _is_equivalent = False + if ids_model2 - ids_model1: + comparison["removed"] = ids_model2 - ids_model1 + _is_equivalent = False + for _id in dictlist1.intersection(dictlist2): + obj1 = dictlist1.get_by_id(_id) + obj2 = dictlist2.get_by_id(_id) + _eq, _comparison = comparison_function(obj1, obj2, ignore_keys=ignore_keys) + if not _eq: + _is_equivalent = False + diff_objs.append(_id) + comparison[_id] = _comparison + return _is_equivalent, comparison, diff_objs + + +def compare_model_state( + model1: "Model", + model2: "Model", + ignore_notes: bool = True, + ignore_keys: Optional[set] = None, +) -> Tuple[bool, Dict]: + """Recursively compare model states. + + Will compare the model and then compare metabolites, reactions, genes, groups in + the model. Models will be considered different if any of the objects within the + cobra model are different. + Will ignore the notes field. If you want to compare the notes field, you may need + to run _fix_xml_annotation_to_identifiers and/or fix_for_notes_changes. + + Parameters + ---------- + model1: cobra.Model + Model to compare. + model2: cobra.Model + Other Model to compare. + ignore_notes: bool, optional + Whether or not to ignore the notes field in the model. Default True. + ignore_keys: set, optional + What keys should be ignored. Defualt None. + + Returns + ------- + Tuple - bool, Dict + A tuple of a boolean (are the two models equivalent or different) + and a dictionary specifying how they differed. The dictionary contains + different_x as a list and x for metabolites, reactions, genes, groups. + The differenet_x specifies which comparisons were not equivalent, while the + x contains the full dictionary of comparing each element (each group, + metabolite, reaction, gene). + + See Also + -------- + fix_for_notes_changes() + """ + _is_equivalent = True + if ignore_keys is None: + ignore_keys = set() + if ignore_notes: + ignore_keys = ignore_keys.union({"notes"}) + do_not_compare_models = { + "metabolites", + "reactions", + "genes", + "notes", + "annotation", + "_annotation", + "groups", + "_sbml", # don't care about SBML properties of the file, just how it is read + "_id", # Will often be different based on different files + "_solver", # Will be different memory locations + } + _eq, model_comparison = compare_state(model1, model2, do_not_compare_models) + _is_equivalent = _eq + _eq, comp_result, different_ids = compare_dictlists( + model1.metabolites, model2.metabolites, ignore_keys + ) + model_comparison["metabolites"] = comp_result + model_comparison["different_mets"] = different_ids + _is_equivalent &= _eq + + _eq, comp_result, different_ids = compare_dictlists( + model1.reactions, + model2.reactions, + ignore_keys, + comparison_function=compare_reaction_state, + ) + model_comparison["reactions"] = comp_result + model_comparison["different_reactions"] = different_ids + _is_equivalent &= _eq + + _eq, comp_result, different_ids = compare_dictlists( + model1.genes, model2.genes, ignore_keys + ) + model_comparison["genes"] = comp_result + model_comparison["different_genes"] = different_ids + _is_equivalent &= _eq + + _eq, comp_result, different_ids = compare_dictlists( + model1.groups, + model2.groups, + ignore_keys, + comparison_function=compare_group_state, + ) + model_comparison["genes"] = comp_result + model_comparison["different_genes"] = different_ids + _is_equivalent &= _eq + + return _is_equivalent, model_comparison + + +def fix_for_reaction_notes_changes(diff_dict: Dict, diff_list: List) -> None: + r"""Fix differences caused in reaction Notes when reading and writing models. + + If you wish to compare reaction Notes, there may be some changes that are caused + because the reading and writing functions (in Matlab) do not fully preserve all + the info in the Notes field. + + If you're using this on a model comparison, it should be the two fields + 'reactions' and 'different_reactions'. + + Matlab - reaction Notes may include References as well. These are read as Notes, + outputted to the rxnReferences field if they match the pubmed format + r"PMID: ?\d+" + + Parameters + ---------- + diff_dict: Dict + Dictionary of differences. + diff_list: List + List of reactions that were present in both dictlists/models but had were + different in some fields. + + Examples + -------- + >>>> + >>>> comparison = compare_model_state(model1, model2) + + """ + for key in diff_list: + if "notes" in diff_dict[key]["modified"].keys(): + note_dictionaries = diff_dict[key]["modified"]["notes"] + note_dictionaries[0] = { + k: v + for k, v in note_dictionaries[0].items() + if k != "References" and k != "NOTES" + } + note_dictionaries[1] = { + k: v + for k, v in note_dictionaries[1].items() + if k != "References" and k != "NOTES" + } + if note_dictionaries[0] == note_dictionaries[1]: + diff_list.remove(key) + diff_dict[key]["modified"].__delitem__("notes") diff --git a/tests/conftest.py b/tests/conftest.py index f5c37545b..0833f6d55 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,103 +1,121 @@ """Define global fixtures.""" -from os.path import join from pathlib import Path from pickle import load as _load +from typing import List, Tuple -from cobra import Metabolite, Model, Reaction +import pytest + +from cobra import Metabolite, Model, Reaction, Solution from cobra.io import read_sbml_model from cobra.util import solver as sutil -try: - import pytest - import pytest_benchmark -except ImportError: - pytest = None - - data_dir = Path(__file__).parent / "data" -def create_test_model(model_name="salmonella") -> Model: +def create_test_model(model_name: str = "salmonella") -> Model: """Return a cobra model for testing. + Parameters + ---------- model_name: str One of 'ecoli', 'textbook', or 'salmonella', or the - path to a pickled cobra.Model + path to a pickled cobra.Model . + + Returns + ------- + cobra.Model + The cobra model. + + Raises + ------ + OSError + If no file is found at the path specified by `model_name`. """ if model_name == "ecoli": - ecoli_sbml = str(data_dir / "iJO1366.xml.gz") + ecoli_sbml = str((data_dir / "iJO1366.xml.gz").resolve()) return read_sbml_model(ecoli_sbml) elif model_name == "textbook": - textbook_sbml = join(data_dir, "textbook.xml.gz") + textbook_sbml = str((data_dir / "textbook.xml.gz").resolve()) return read_sbml_model(textbook_sbml) elif model_name == "mini": - mini_sbml = join(data_dir, "mini_fbc2.xml") + mini_sbml = str((data_dir / "mini_fbc2.xml").resolve()) return read_sbml_model(mini_sbml) elif model_name == "salmonella": - salmonella_pickle = join(data_dir, "salmonella.pickle") + salmonella_pickle = str((data_dir / "salmonella.pickle").resolve()) model_name = salmonella_pickle - with open(model_name, "rb") as infile: + with open(model_name, mode="rb") as infile: return _load(infile) @pytest.fixture(scope="session") -def data_directory(): +def data_directory() -> Path: + """Provide session-level fixture for test data directory.""" return data_dir @pytest.fixture(scope="session") -def empty_once(): +def empty_once() -> Model: + """Provide session-level fixture for empty model.""" return Model() @pytest.fixture(scope="function") -def empty_model(empty_once): +def empty_model(empty_once: Model) -> Model: + """Provide function-level fixture for empty model.""" return empty_once.copy() @pytest.fixture(scope="session") -def small_model(): +def small_model() -> Model: + """Provide session-level fixture for textbook model.""" return create_test_model("textbook") @pytest.fixture(scope="function") -def model(small_model): +def model(small_model: Model) -> Model: + """Provide function-level fixture for textbook model.""" return small_model.copy() @pytest.fixture(scope="session") -def large_once(): +def large_once() -> Model: + """Provide session-level fixture for ecoli model.""" return create_test_model("ecoli") @pytest.fixture(scope="function") -def large_model(large_once): +def large_model(large_once: Model) -> Model: + """Provide function-level fixture for ecoli model.""" return large_once.copy() @pytest.fixture(scope="session") -def medium_model(): +def medium_model() -> Model: + """Provide session-level fixture for salmonella model.""" return create_test_model("salmonella") @pytest.fixture(scope="function") -def salmonella(medium_model): +def salmonella(medium_model: Model) -> Model: + """Provide function-level fixture for salmonella model.""" return medium_model.copy() @pytest.fixture(scope="function") -def solved_model(data_directory): +def solved_model(data_directory: Path) -> Tuple[Solution, Model]: + """Provide function-level fixture for solved textbook model.""" model = create_test_model("textbook") - with open(join(data_directory, "textbook_solution.pickle"), "rb") as infile: + with (data_directory / "textbook_solution.pickle").open(mode="rb") as infile: solution = _load(infile) return solution, model @pytest.fixture(scope="session") -def tiny_toy_model(): +def tiny_toy_model() -> Model: + """Provide session-level fixture for tiny toy model.""" tiny = Model("Toy Model") m1 = Metabolite("M1") d1 = Reaction("ex1") @@ -114,12 +132,14 @@ def tiny_toy_model(): @pytest.fixture(params=all_solvers, scope="session") -def opt_solver(request): +def opt_solver(request: pytest.FixtureRequest) -> str: + """Provide session-level fixture for parametrized optlang solver names.""" return request.param @pytest.fixture(scope="function") -def metabolites(model, request): +def metabolites(model: Model, request: pytest.FixtureRequest) -> List[Metabolite]: + """Provide function-level fixture for metabolite set based on `request`.""" if request.param == "exchange": return [ met @@ -139,4 +159,4 @@ def metabolites(model, request): if met.compartment == "c" and "SK_" + met.id not in model.reactions ] else: - raise ValueError("unknown metabolites {}".format(request.param)) + raise ValueError("Unknown metabolites {}".format(request.param)) diff --git a/tests/test_core/conftest.py b/tests/test_core/conftest.py index 40fad4641..808077174 100644 --- a/tests/test_core/conftest.py +++ b/tests/test_core/conftest.py @@ -1,14 +1,16 @@ -# -*- coding: utf-8 -*- +"""Define module level fixtures.""" -"""Module level fixtures""" - -from __future__ import absolute_import +from typing import TYPE_CHECKING, Tuple import pytest from cobra.util.solver import solvers +if TYPE_CHECKING: + from cobra import Model, Solution + + solver_trials = [ "glpk", pytest.param( @@ -27,7 +29,10 @@ @pytest.fixture(scope="function", params=solver_trials) -def solved_model(request, model): +def solved_model( + request: pytest.FixtureRequest, model: "Model" +) -> Tuple["Solution", "Model"]: + """Return solved model.""" model.solver = request.param solution = model.optimize() return solution, model diff --git a/tests/test_core/test_solution.py b/tests/test_core/test_solution.py index 5c2cb77a3..74506c49d 100644 --- a/tests/test_core/test_solution.py +++ b/tests/test_core/test_solution.py @@ -1,17 +1,19 @@ -# -*- coding: utf-8 -*- +"""Test functions of solution.py .""" -"""Test functions of solution.py""" - -from __future__ import absolute_import +from typing import TYPE_CHECKING, Tuple from cobra.core import Solution -def test_solution_contains_only_reaction_specific_values(solved_model): +if TYPE_CHECKING: + from cobra import Model + + +def test_solution_contains_only_reaction_specific_values( + solved_model: Tuple[Solution, "Model"] +) -> None: + """Test solution contains specific reaction values.""" solution, model = solved_model reaction_ids = set([reaction.id for reaction in model.reactions]) - if isinstance(solution, Solution): - assert set(solution.fluxes.index) == reaction_ids - # assert set(solution.reduced_costs.index) == reaction_ids - else: - raise TypeError("solutions of type {0:r} are untested".format(type(solution))) + assert set(solution.fluxes.index) == reaction_ids + # assert set(solution.reduced_costs.index) == reaction_ids diff --git a/tests/test_io/test_annotation.py b/tests/test_io/test_annotation.py index dffeed4d5..f58e267ec 100644 --- a/tests/test_io/test_annotation.py +++ b/tests/test_io/test_annotation.py @@ -1,10 +1,24 @@ -from os.path import join +"""Test model annotations in SBML format.""" + +from pathlib import Path +from typing import TYPE_CHECKING from cobra.io import read_sbml_model, write_sbml_model -def _check_sbml_annotations(model): - """Checks the annotations from the annotation.xml.""" +if TYPE_CHECKING: + from cobra import Model + + +def _check_sbml_annotations(model: "Model") -> None: + """Check the annotations from the annotation.xml. + + Parameters + ---------- + model : cobra.Model + The model to check annotations for. + + """ assert model is not None # model annotation @@ -83,19 +97,31 @@ def _check_sbml_annotations(model): assert annotation["biocyc"] == "META:ACETALD-DEHYDROG-RXN" -def test_read_sbml_annotations(data_directory): - """Test reading and writing annotations.""" - with open(join(data_directory, "annotation.xml"), "r") as f_in: +def test_read_sbml_annotations(data_directory: Path) -> None: + """Test reading and writing annotations. + + data_directory : pathlib.Path + The path to the test data directory. + + """ + with open(data_directory / "annotation.xml", "r") as f_in: model1 = read_sbml_model(f_in) _check_sbml_annotations(model1) -def test_read_write_sbml_annotations(data_directory, tmp_path): - """Test reading and writing annotations.""" - with open(join(data_directory, "annotation.xml"), "r") as f_in: +def test_read_write_sbml_annotations(data_directory: Path, tmp_path: Path) -> None: + """Test reading and writing annotations. + + data_directory : pathlib.Path + The path to the test data directory. + tmp_path : pathlib.Path + The path to the temporary test assets store. + + """ + with open(data_directory / "annotation.xml", "r") as f_in: model1 = read_sbml_model(f_in) - sbml_path = join(str(tmp_path), "test.xml") + sbml_path = tmp_path / "test.xml" with open(sbml_path, "w") as f_out: write_sbml_model(model1, f_out) diff --git a/tests/test_io/test_annotation_format.py b/tests/test_io/test_annotation_format.py index 74b49bdcf..9b9d039cd 100644 --- a/tests/test_io/test_annotation_format.py +++ b/tests/test_io/test_annotation_format.py @@ -1,13 +1,22 @@ -from os.path import join +"""Test model annotations in JSON format.""" + +from pathlib import Path import pytest from cobra.io import load_json_model, write_sbml_model -def test_load_json_model_valid(data_directory, tmp_path): - """Test loading a valid annotation from JSON.""" - path_to_file = join(data_directory, "valid_annotation_format.json") +def test_load_json_model_valid(data_directory: Path, tmp_path: Path) -> None: + """Test loading a valid annotation from JSON. + + data_directory : pathlib.Path + The path to the test data directory. + tmp_path : pathlib.Path + The path to the temporary test assets store. + + """ + path_to_file = data_directory / "valid_annotation_format.json" model = load_json_model(path_to_file) expected = { "bigg.reaction": [["is", "PFK26"]], @@ -16,12 +25,17 @@ def test_load_json_model_valid(data_directory, tmp_path): } for metabolite in model.metabolites: assert metabolite.annotation == expected - path_to_output = join(str(tmp_path), "valid_annotation_output.xml") - write_sbml_model(model, path_to_output) + path_to_output = tmp_path / "valid_annotation_output.xml" + write_sbml_model(model, str(path_to_output.resolve())) + + +def test_load_json_model_invalid(data_directory: Path) -> None: + """Test that loading an invalid annotation from JSON raises TypeError. + data_directory : pathlib.Path + The path to the test data directory. -def test_load_json_model_invalid(data_directory): - """Test that loading an invalid annotation from JSON raises TypeError.""" - path = join(data_directory, "invalid_annotation_format.json") + """ + path = data_directory / "invalid_annotation_format.json" with pytest.raises(TypeError): - model = load_json_model(path) + load_json_model(path) diff --git a/tests/test_io/test_io_order.py b/tests/test_io/test_io_order.py index 5703736ce..2813233a7 100644 --- a/tests/test_io/test_io_order.py +++ b/tests/test_io/test_io_order.py @@ -1,11 +1,10 @@ -# -*- coding: utf-8 -*- - -from __future__ import absolute_import +"""Test functionalities of I/O in an usage order.""" import logging from operator import attrgetter -from os.path import join +from pathlib import Path from random import sample +from typing import Iterable, List import pytest @@ -13,32 +12,32 @@ from cobra import io as cio -LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) @pytest.fixture(scope="module") -def tmp_path_order(tmp_path_factory): - return str(tmp_path_factory.mktemp("model_order")) +def tmp_path_order(tmp_path_factory: Path) -> Path: + """Temporary path for I/O order tests.""" + return tmp_path_factory.mktemp("model_order") @pytest.fixture(scope="module") -def minimized_shuffle(small_model): +def minimized_shuffle(small_model: Model) -> Model: + """Generate a minimal shuffled model for I/O order tests.""" model = small_model.copy() chosen = sample(list(set(model.reactions) - set(model.exchanges)), 10) new = Model("minimized_shuffle") new.add_reactions(chosen) - LOGGER.debug( - "'%s' has %d metabolites, %d reactions, and %d genes.", - new.id, - new.metabolites, - new.reactions, - new.genes, + logger.debug( + f"'{new.id}' has {new.metabolites} metabolites, {new.reactions} reactions, and " + f"{new.genes} genes." ) return new @pytest.fixture(scope="module") -def minimized_sorted(minimized_shuffle): +def minimized_sorted(minimized_shuffle: Model) -> Model: + """Generate a minimal sorted model for I/O order tests.""" model = minimized_shuffle.copy() model.id = "minimized_sorted" model.metabolites = DictList(sorted(model.metabolites, key=attrgetter("id"))) @@ -48,7 +47,8 @@ def minimized_sorted(minimized_shuffle): @pytest.fixture(scope="module") -def minimized_reverse(minimized_shuffle): +def minimized_reverse(minimized_shuffle: Model) -> Model: + """Generate a minimal reversed model for I/O order tests.""" model = minimized_shuffle.copy() model.id = "minimized_reverse" model.metabolites = DictList( @@ -65,16 +65,24 @@ def minimized_reverse(minimized_shuffle): scope="module", params=["minimized_shuffle", "minimized_reverse", "minimized_sorted"], ) -def template(request, minimized_shuffle, minimized_reverse, minimized_sorted): +def template( + request: pytest.FixtureRequest, + minimized_shuffle: Model, + minimized_reverse: Model, + minimized_sorted: Model, +) -> Model: + """Return the cobra Model instances found in the current local symbol table.""" return locals()[request.param] @pytest.fixture(scope="module", params=["metabolites", "reactions", "genes"]) -def attribute(request): +def attribute(request: pytest.FixtureRequest) -> str: + """Return the parameter passed.""" return request.param -def get_ids(iterable): +def _get_ids(iterable: Iterable) -> List[str]: + """Get IDs for elements in `iterable`.""" return [x.id for x in iterable] @@ -86,14 +94,39 @@ def get_ids(iterable): ("load_yaml_model", "save_yaml_model", ".yml"), ], ) -def test_io_order(attribute, read, write, ext, template, tmp_path_order): +def test_io_order( + attribute: str, + read: str, + write: str, + ext: str, + template: Model, + tmp_path_order: Path, +) -> None: + """Test loading and saving of models in order. + + Parameters + ---------- + attribute : str + The attribute of cobra Model to access. + read : str + The function name for loading model, defined as string. + write : str + The function name for saving model, defined as string. + ext : str + The extension of the file format for loading and saving model. + template : cobra.Model + The cobra Model instance to load and save. + tmp_path_order : pathlib.Path + The folder path for storing I/O order test files. + + """ read = getattr(cio, read) write = getattr(cio, write) - filename = join(tmp_path_order, "template" + ext) - write(template, filename) - model = read(filename) - model_elements = get_ids(getattr(model, attribute)) - template_elements = get_ids(getattr(template, attribute)) + file_path = tmp_path_order / f"template{ext}" + write(template, str(file_path.resolve())) + model = read(str(file_path.resolve())) + model_elements = _get_ids(getattr(model, attribute)) + template_elements = _get_ids(getattr(template, attribute)) assert len(model_elements) == len(template_elements) assert set(model_elements) == set(template_elements) assert model_elements == template_elements diff --git a/tests/test_io/test_mat.py b/tests/test_io/test_mat.py index b6d7594e5..4e392e92e 100644 --- a/tests/test_io/test_mat.py +++ b/tests/test_io/test_mat.py @@ -1,13 +1,12 @@ """Test functionalities of I/O in MATLAB (.mat) format.""" -from os.path import join from pathlib import Path from pickle import load from typing import TYPE_CHECKING, Callable import pytest -from cobra import io +from cobra.io import load_matlab_model, read_sbml_model, save_matlab_model try: @@ -17,14 +16,13 @@ if TYPE_CHECKING: - from cobra import Model @pytest.fixture(scope="function") -def raven_model(data_directory: str) -> "Model": +def raven_model(data_directory: Path) -> "Model": """Fixture for RAVEN model.""" - with open(join(data_directory, "raven.pickle"), "rb") as infile: + with open(data_directory / "raven.pickle", "rb") as infile: return load(infile) @@ -41,9 +39,18 @@ def test_load_matlab_model( mini_model: "Model", raven_model: "Model", ) -> None: - """Test the reading of MAT model.""" - mini_mat_model = io.load_matlab_model(join(data_directory, "mini.mat")) - raven_mat_model = io.load_matlab_model(join(data_directory, "raven.mat")) + """Test the reading of MAT model. + + Parameters + ---------- + compare_models : Callable + A callable to compare models. + data_directory : pathlib.Path + The path to the test data directory. + + """ + mini_mat_model = load_matlab_model(str((data_directory / "mini.mat").resolve())) + raven_mat_model = load_matlab_model(str((data_directory / "raven.mat").resolve())) assert compare_models(mini_model, mini_mat_model) is None assert compare_models(raven_model, raven_mat_model) is None @@ -59,24 +66,44 @@ def test_load_matlab_model( def test_save_matlab_model( tmp_path: Path, mini_model: "Model", raven_model: "Model" ) -> None: - """Test the writing of MAT model.""" - mini_output_file = tmp_path.joinpath("mini.mat") - raven_output_file = tmp_path.joinpath("raven.mat") + """Test the writing of MAT model. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the temporary test assets store. + mini_model : cobra.Model + The mini model. + raven_model : cobra.Model + The RAVEN model. + + """ + mini_output_file = tmp_path / "mini.mat" + raven_output_file = tmp_path / "raven.mat" # scipy.io.savemat() doesn't support anything other than # str or file-stream object, hence the str conversion - io.save_matlab_model(mini_model, str(mini_output_file)) - io.save_matlab_model(raven_model, str(raven_output_file)) + save_matlab_model(mini_model, str(mini_output_file.resolve())) + save_matlab_model(raven_model, str(raven_output_file.resolve())) assert mini_output_file.exists() assert raven_output_file.exists() @pytest.mark.skipif(scipy is None, reason="scipy unavailable") def test_large_bounds(tmp_path: Path, model: "Model") -> None: - """Verify that mat bounds don't get broken by the config defaults.""" + """Verify that mat bounds don't get broken by the config defaults. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the temporary test assets store. + model : cobra.Model + The "textbook" model. + + """ model.reactions[0].bounds = -1e6, 1e6 - filepath = str(tmp_path.joinpath("model.mat")) - io.save_matlab_model(model, filepath) - read = io.load_matlab_model(filepath) + filepath = tmp_path / "model.mat" + save_matlab_model(model, str(filepath.resolve())) + read = load_matlab_model(str(filepath.resolve())) assert read.reactions[0].bounds == (-1e6, 1e6) @@ -84,23 +111,34 @@ def test_large_bounds(tmp_path: Path, model: "Model") -> None: def test_read_rewrite_matlab_model( compare_models: Callable, tmp_path: Path, data_directory: Path ) -> None: - """Verify that rewritten matlab model is identical to original.""" - mini_mat_model = io.load_matlab_model(join(data_directory, "mini.mat")) - raven_mat_model = io.load_matlab_model(join(data_directory, "raven.mat")) + """Verify that rewritten matlab model is identical to original. + + Parameters + ---------- + compare_models : Callable + A callable to compare models. + tmp_path : pathlib.Path + The path to the temporary test assets store. + data_directory : pathlib.Path + The path to the test data directory. + + """ + mini_mat_model = load_matlab_model(str((data_directory / "mini.mat").resolve())) + raven_mat_model = load_matlab_model(str((data_directory / "raven.mat").resolve())) mini_output_file = tmp_path.joinpath("mini.mat") raven_output_file = tmp_path.joinpath("raven.mat") # scipy.io.savemat() doesn't support anything other than # str or file-stream object, hence the str conversion - io.save_matlab_model(mini_mat_model, str(mini_output_file)) - io.save_matlab_model(raven_mat_model, str(raven_output_file)) - mini_mat_model_reload = io.load_matlab_model(str(mini_output_file)) - raven_mat_model_reload = io.load_matlab_model(str(raven_output_file)) + save_matlab_model(mini_mat_model, str(mini_output_file)) + save_matlab_model(raven_mat_model, str(raven_output_file)) + mini_mat_model_reload = load_matlab_model(str(mini_output_file)) + raven_mat_model_reload = load_matlab_model(str(raven_output_file)) assert compare_models(mini_mat_model, mini_mat_model_reload) is None assert compare_models(raven_mat_model, raven_mat_model_reload) is None def _fix_xml_annotation_to_identifiers(model: "Model") -> None: - """Some XML models with cobra have annotations that do not match identifiers.org. + """Fix XML annotations to respect identifiers.org . This function will fix the dict keys of annotations to match identifiers.org. Eventually, the XML models should be fixed and cobrapy should be strict, but this is @@ -113,8 +151,9 @@ def _fix_xml_annotation_to_identifiers(model: "Model") -> None: Parameters ---------- - model: Model - A model to fix + model : cobra.Model + The model to fix. + """ for met in model.metabolites: if met.formula == "": @@ -160,36 +199,67 @@ def test_compare_xml_to_written_matlab_model( tmp_path: Path, xml_file: str, ) -> None: - """Verify that xml rewritten as mat file is written and read correctly.""" - xml_model = io.read_sbml_model(join(data_directory, xml_file)) + """Verify that xml rewritten as mat file is written and read correctly. + + Parameters + ---------- + compare_models : Callable + A callable to compare models. + data_directory : pathlib.Path + The path to the test data directory. + tmp_path : pathlib.Path + The path to the temporary test assets store. + xml_file : str + The name of the XML file to compare against. + + """ + xml_model = read_sbml_model(str((data_directory / xml_file).resolve())) _fix_xml_annotation_to_identifiers(xml_model) - mat_output_file = tmp_path.joinpath(xml_file.replace(".xml", ".mat")) - io.save_matlab_model( - xml_model, str(mat_output_file) + mat_output_file = tmp_path / xml_file.replace(".xml", ".mat") + save_matlab_model( + xml_model, str(mat_output_file.resolve()) ) # lac__D_e_boundary confuses the reading of matlab - mat_model = io.load_matlab_model(str(mat_output_file)) + mat_model = load_matlab_model(str(mat_output_file.resolve())) assert compare_models(xml_model, mat_model) is None @pytest.mark.skipif(scipy is None, reason="scipy unavailable") -def test_fail_on_problematic_compartments(data_directory: str) -> None: - """Test that mat import will fail if there are problems in compartments.""" +def test_fail_on_problematic_compartments(data_directory: Path) -> None: + """Test that mat import will fail if there are problems in compartments. + + Parameters + ---------- + data_directory : pathlib.Path + The path to the test data directory. + + """ with pytest.raises(IOError): # AntCore does not have defined compartments - ant_core_model = io.load_matlab_model(join(data_directory, "AntCore.mat")) + load_matlab_model(str((data_directory / "AntCore.mat").resolve())) with pytest.raises(IOError): # Ec_iAF1260_flux1 has underscore in compartment names which is not allowed - Ec_iAF1260_flux1_model = io.load_matlab_model( - join(data_directory, "Ec_iAF1260_flux1.mat") - ) + load_matlab_model(str((data_directory / "Ec_iAF1260_flux1.mat").resolve())) @pytest.mark.skipif(scipy is None, reason="scipy unavailable") def test_mat_model_with_long_compartment_ids( compare_models: Callable, data_directory: Path, tmp_path: Path ) -> None: - """Test that long compartment IDs like "luSI" are correctly loaded.""" - model_compartments = io.load_matlab_model(join(data_directory, "compartments.mat")) + """Test that long compartment IDs like "luSI" are correctly loaded. + + Parameters + ---------- + compare_models : Callable + A callable to compare models. + data_directory : pathlib.Path + The path to the test data directory. + tmp_path : pathlib.Path + The path to the temporary test assets store. + + """ + model_compartments = load_matlab_model( + str((data_directory / "compartments.mat").resolve()) + ) assert model_compartments.compartments == { "csf": "csf", "bcK": "bcK", @@ -209,9 +279,9 @@ def test_mat_model_with_long_compartment_ids( "kegg.compound": ["C00031"], "pubchem.substance": ["3333"], } - output_file = tmp_path.joinpath("compartments.mat") - io.save_matlab_model(model_compartments, str(output_file)) - model_compartments_reloaded = io.load_matlab_model(str(output_file)) + output_file = tmp_path / "compartments.mat" + save_matlab_model(model_compartments, str(output_file.resolve())) + model_compartments_reloaded = load_matlab_model(str(output_file.resolve())) assert compare_models(model_compartments, model_compartments_reloaded) is None @@ -219,14 +289,25 @@ def test_mat_model_with_long_compartment_ids( def test_mat_model_with_no_genes( compare_models: Callable, data_directory: Path, tmp_path: Path ) -> None: - """Test that a model with no genes is loaded and reloaded correctly.""" - model_no_genes = io.load_matlab_model( - join(data_directory, "cardiac_mit_glcuptake_atpmax.mat") + """Test that a model with no genes is loaded and reloaded correctly. + + Parameters + ---------- + compare_models : Callable + A callable to compare models. + data_directory : pathlib.Path + The path to the test data directory. + tmp_path : pathlib.Path + The path to the temporary test assets store. + + """ + model_no_genes = load_matlab_model( + str((data_directory / "cardiac_mit_glcuptake_atpmax.mat").resolve()) ) assert not len(model_no_genes.genes) - output_file = tmp_path.joinpath("cardiac_mit_glcuptake_atpmax.mat") - io.save_matlab_model(model_no_genes, str(output_file)) - model_no_genes_reloaded = io.load_matlab_model(str(output_file)) + output_file = tmp_path / "cardiac_mit_glcuptake_atpmax.mat" + save_matlab_model(model_no_genes, str(output_file.resolve())) + model_no_genes_reloaded = load_matlab_model(str(output_file.resolve())) assert compare_models(model_no_genes, model_no_genes_reloaded) is None @@ -236,10 +317,18 @@ def test_mat_model_wrong_caps(compare_models: Callable, data_directory: Path) -> See https://gist.github.com/akaviaLab/3dcb0eed6563a9d3d1e07198337300ac to create it again when needed. + + Parameters + ---------- + compare_models : Callable + A callable to compare models. + data_directory : pathlib.Path + The path to the test data directory. + """ - mat_model = io.load_matlab_model(join(data_directory, "mini.mat")) - mat_wrong_caps_model = io.load_matlab_model( - join(data_directory, "mini_wrong_key_caps.mat") + mat_model = load_matlab_model(str(Path(data_directory / "mini.mat").resolve())) + mat_wrong_caps_model = load_matlab_model( + str(Path(data_directory, "mini_wrong_key_caps.mat").resolve()) ) assert compare_models(mat_model, mat_wrong_caps_model) is None assert mat_wrong_caps_model.reactions.get_by_id("LDH_D").annotation == { diff --git a/tests/test_io/test_notes.py b/tests/test_io/test_notes.py index a1fbf0b2c..7a15640c0 100644 --- a/tests/test_io/test_notes.py +++ b/tests/test_io/test_notes.py @@ -1,28 +1,37 @@ -from os.path import join +"""Test proper reading of SBML notes.""" -import cobra +from pathlib import Path + +from cobra import Metabolite, Model, Reaction from cobra.io import read_sbml_model, write_sbml_model -def test_notes(tmp_path): - """Testing if model notes are written in SBML.""" - path_to_file = join(str(tmp_path), "model_notes.xml") +def test_notes(tmp_path: Path) -> None: + """Test if model notes are written in SBML. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the temporary test assets store. + + """ + path_to_file = tmp_path / "model_notes.xml" # making a minimal cobra model to test notes - model = cobra.Model("e_coli_core") + model = Model("e_coli_core") model.notes["Remark"] = "...Model Notes..." - met = cobra.Metabolite("pyr_c", compartment="c") + met = Metabolite("pyr_c", compartment="c") model.add_metabolites([met]) met.notes["Remark"] = "Note with \n newline" - rxn = cobra.Reaction("R_ATPM") + rxn = Reaction("R_ATPM") model.add_reactions([rxn]) rxn.notes["Remark"] = "What about me?" model.objective_direction = "max" model.objective = rxn - write_sbml_model(model, path_to_file) + write_sbml_model(model, str(path_to_file.resolve())) # reading the model back - model_after_reading = read_sbml_model(path_to_file) + model_after_reading = read_sbml_model(str(path_to_file.resolve())) met_after_reading = model_after_reading.metabolites.get_by_id("pyr_c") reaction_after_reading = model_after_reading.reactions.get_by_id("R_ATPM") diff --git a/tests/test_io/test_web/test_load.py b/tests/test_io/test_web/test_load.py index abfb4b809..674b7e7b0 100644 --- a/tests/test_io/test_web/test_load.py +++ b/tests/test_io/test_web/test_load.py @@ -1,5 +1,9 @@ +"""Test model loading from local and remote model repositores.""" + import gzip -import pathlib +from pathlib import Path +from typing import TYPE_CHECKING, Callable +from unittest.mock import Mock import pytest @@ -7,15 +11,21 @@ from cobra.io import BiGGModels, BioModels, load_model +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + from cobra import Model + + @pytest.fixture(scope="module") -def mini_sbml(data_directory): +def mini_sbml(data_directory: Path) -> bytes: """Provide a gzip-compressed SBML document.""" - with (pathlib.Path(data_directory) / "mini_cobra.xml").open(mode="rb") as handle: + with (data_directory / "mini_cobra.xml").open(mode="rb") as handle: return gzip.compress(handle.read()) @pytest.fixture -def bigg_models(mini_sbml, mocker): +def bigg_models(mini_sbml: bytes, mocker: "MockerFixture") -> Mock: """Provide a mocked BiGG Models repository interface.""" result = mocker.Mock(spec_set=BiGGModels) result.get_sbml.return_value = mini_sbml @@ -23,30 +33,42 @@ def bigg_models(mini_sbml, mocker): @pytest.fixture -def biomodels(mini_sbml, mocker): +def biomodels(mini_sbml: bytes, mocker: "MockerFixture") -> Mock: """Provide a mocked BioModels repository interface.""" result = mocker.Mock(spec_set=BioModels) result.get_sbml.return_value = mini_sbml return result -def test_bigg_access(bigg_models): - """Test that SBML would be retrieved from the BiGG Models repository.""" +def test_bigg_access(bigg_models: Mock) -> None: + """Test that SBML would be retrieved from the BiGG Models repository. + + Parameters + ---------- + bigg_models : unittest.mock.Mock + The mocked object for BiGG model respository. + + """ load_model("e_coli_core", cache=False, repositories=[bigg_models]) bigg_models.get_sbml.assert_called_once_with(model_id="e_coli_core") -def test_biomodels_access(biomodels): - """Test that SBML would be retrieved from the BioModels repository.""" +def test_biomodels_access(biomodels: Mock) -> None: + """Test that SBML would be retrieved from the BioModels repository. + + Parameters + ---------- + biomodels : unittest.mock.Mock + The mocked object for BioModels model respository. + + """ load_model("BIOMD0000000633", cache=False, repositories=[biomodels]) biomodels.get_sbml.assert_called_once_with(model_id="BIOMD0000000633") -def test_unknown_model(): +def test_unknown_model() -> None: """Expect that a not found error is raised (e2e).""" - with pytest.raises( - RuntimeError, match="could not be found in any of the repositories." - ): + with pytest.raises(RuntimeError): load_model("MODELWHO?", cache=False) @@ -54,15 +76,41 @@ def test_unknown_model(): "model_id, num_metabolites, num_reactions", [("e_coli_core", 72, 95), ("BIOMD0000000633", 50, 35)], ) -def test_remote_load(model_id: str, num_metabolites: int, num_reactions: int): - """Test that sample models can be loaded from remote repositories (e2e).""" +def test_remote_load(model_id: str, num_metabolites: int, num_reactions: int) -> None: + """Test that sample models can be loaded from remote repositories (e2e). + + Parameters + ---------- + model_id : str + The ID of the model. + num_metabolites : int + The total number of metabolites in the model having ID `model_id`. + num_reactions : int + The total number of reactions in the model having ID `model_id`. + + """ model = load_model(model_id, cache=False) assert len(model.metabolites) == num_metabolites assert len(model.reactions) == num_reactions -def test_cache(monkeypatch, tmp_path, bigg_models, biomodels): - """Test that remote models are properly cached.""" +def test_cache( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, bigg_models: Mock, biomodels: Mock +) -> None: + """Test that remote models are properly cached. + + Parameters + ---------- + monkeypatch : pytest.MonkeyPatch + The monkeypatch-ing object. + tmp_path : pathlib.Path + The path to the temporary test assets store. + bigg_models : Mock + The mocked object for BiGG model respository. + biomodels : unittest.mock.Mock + The mocked object for BioModels model respository. + + """ config = Configuration() monkeypatch.setattr(config, "cache_directory", tmp_path) remote_model = load_model("e_coli_core") @@ -73,6 +121,16 @@ def test_cache(monkeypatch, tmp_path, bigg_models, biomodels): assert len(cached_model.reactions) == len(remote_model.reactions) -def test_local_load(model, compare_models): +def test_local_load(model: "Model", compare_models: Callable) -> None: + """Test model loading from local repository. + + Parameters + ---------- + model : cobra.Model + The model to compare local loading against. + compare_models : Callable + A callable object to compare local loading. + + """ model_local = load_model("textbook") compare_models(model, model_local) diff --git a/tests/test_util/test_compare.py b/tests/test_util/test_compare.py new file mode 100644 index 000000000..05375a88d --- /dev/null +++ b/tests/test_util/test_compare.py @@ -0,0 +1,1269 @@ +"""Test Comparing functions in cobra.util.compare.py .""" + + +import numpy as np +import pytest + +from cobra.core import GPR, Metabolite, Model, Reaction +from cobra.util.compare import compare_reaction_state, compare_state + + +def test_reaction_copies_are_equivalent(model: Model) -> None: + """Test that a copy of a reaction will return true when using compare functions.""" + reaction = model.reactions.get_by_id("PGI") + old_reaction = reaction.copy() + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert equivalent + assert comparison["same"] == reaction.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_reaction_with_added_field_is_different(model: Model) -> None: + """Test that reaction with an added field will be identified by compare.""" + reaction = model.reactions.get_by_id("PGI") + old_reaction = reaction.copy() + reaction.blah = None + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["same"] == old_reaction.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == {"blah"} + assert comparison["removed"] == set() + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["same"] == old_reaction.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == {"blah"} + + +def test_reaction_different_ids(model: Model) -> None: + """Test that reactions that differ in ids are not identical.""" + reaction = model.reactions.get_by_id("PGI") + old_reaction = reaction.copy() + reaction.id = "PGI2" + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference({"_id"}) + assert comparison["modified"] == {"_id": ("PGI2", "PGI")} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_reaction_different_names(model: Model) -> None: + """Test that reactions that differ in names are not identical.""" + reaction = model.reactions.get_by_id("PGI") + old_reaction = reaction.copy() + reaction.name = reaction.name + " " + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"name"} + ) + assert comparison["modified"] == { + "name": (old_reaction.name + " ", old_reaction.name) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + reaction.name = None + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"name"} + ) + assert comparison["modified"] == {"name": (None, old_reaction.name)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + reaction.name = "" + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"name"} + ) + assert comparison["modified"] == {"name": ("", old_reaction.name)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + reaction.name = "Test" + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"name"} + ) + assert comparison["modified"] == {"name": ("Test", old_reaction.name)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_reaction_gpr_modification(model: Model) -> None: + """Test reactions are not equivalent after GPR/gene rule manipulations.""" + reaction = model.reactions.get_by_id("PGI") + old_reaction = reaction.copy() + old_gene = list(reaction.genes)[0] + + # Add an existing 'gene' to the GPR + reaction.gene_reaction_rule = "s0001" + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["modified"]["_genes"] == ({"s0001"}, {old_gene.id}) + assert comparison["modified"]["_gpr"] == (reaction.gpr, old_reaction.gpr) + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"_gpr", "_genes"} + ) + + old_reaction = reaction.copy() + old_reaction.gpr = GPR() + equivalent, comparison = compare_reaction_state(reaction, old_reaction) + assert not equivalent + assert comparison["modified"]["_genes"] == ({"s0001"}, set()) + assert comparison["modified"]["_gpr"] == (reaction.gpr, GPR()) + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"_gpr", "_genes"} + ) + + +def test_compare_rxn_bounds(model: Model) -> None: + """Test reaction bounds setting for a scenario.""" + acald_reaction = model.reactions.ACALD + new_reaction = acald_reaction.copy() + new_reaction.bounds = ( + acald_reaction.lower_bound - 100, + acald_reaction.lower_bound - 100, + ) + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"_lower_bound", "_upper_bound"} + ) + assert comparison["modified"] == { + "_lower_bound": (-1000, -1100), + "_upper_bound": (1000, -1100), + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + new_reaction = acald_reaction.copy() + new_reaction.bounds = ( + acald_reaction.upper_bound + 100, + acald_reaction.upper_bound + 100, + ) + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"_lower_bound", "_upper_bound"} + ) + assert comparison["modified"] == { + "_lower_bound": (-1000, 1100), + "_upper_bound": (1000, 1100), + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + new_reaction = acald_reaction.copy() + new_reaction.lower_bound = -100 + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"_lower_bound"} + ) + assert comparison["modified"] == {"_lower_bound": (-1000, -100)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + new_reaction = acald_reaction.copy() + new_reaction.upper_bound = 100 + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"_upper_bound"} + ) + assert comparison["modified"] == {"_upper_bound": (1000, 100)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + new_reaction = acald_reaction.copy() + new_reaction.knock_out() + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"_lower_bound", "_upper_bound"} + ) + assert comparison["modified"] == { + "_lower_bound": (-1000, 0), + "_upper_bound": (1000, 0), + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + new_reaction = acald_reaction.copy() + new_reaction.bounds = (0, 0) + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"_lower_bound", "_upper_bound"} + ) + assert comparison["modified"] == { + "_lower_bound": (-1000, 0), + "_upper_bound": (1000, 0), + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_compare_reaction_different_subsystem(model: Model) -> None: + """Test that differences in subsystem are picked up by compare function.""" + acald_reaction = model.reactions.ACALD + new_reaction = acald_reaction.copy() + new_reaction.subsystem = "Test" + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"subsystem"} + ) + assert comparison["modified"] == {"subsystem": ("", "Test")} + new_reaction = acald_reaction.copy() + new_reaction.subsystem = None + equivalent, comparison = compare_reaction_state(acald_reaction, new_reaction) + assert not equivalent + assert comparison["same"] == set(acald_reaction.__getstate__().keys()).difference( + {"subsystem"} + ) + assert comparison["modified"] == {"subsystem": ("", None)} + + +def test_add_metabolite_comparison(model: Model) -> None: + """Test metabolite addition to a reaction is not equivalent to original reaction.""" + with model: + with model: + reaction = model.reactions.get_by_id("PGI") + old_reaction = reaction.copy() + reaction.add_metabolites({model.metabolites[0]: 1}) + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"_metabolites"} + ) + old_metabolites = {k.id: v for k, v in old_reaction.metabolites.items()} + reaction_metabolites = old_metabolites.copy() + reaction_metabolites.update({model.metabolites[0].id: 1}) + assert comparison["modified"] == { + "_metabolites": (old_metabolites, reaction_metabolites) + } + + fake_metabolite = Metabolite("fake") + reaction.add_metabolites({fake_metabolite: 1}) + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"_metabolites"} + ) + reaction_metabolites.update({"fake": 1}) + assert comparison["modified"] == { + "_metabolites": (old_metabolites, reaction_metabolites) + } + + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()) + + # Test adding by string + with model: + reaction.add_metabolites({"g6p_c": -1}) # already in reaction + reaction_metabolites = {m.id: v for m, v in old_reaction.metabolites.items()} + reaction_metabolites["g6p_c"] = -2 + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["modified"] == { + "_metabolites": (old_metabolites, reaction_metabolites) + } + reaction.add_metabolites({"h_c": 1}) + reaction_metabolites["h_c"] = 1 + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["modified"] == { + "_metabolites": (old_metabolites, reaction_metabolites) + } + + equivalent, _ = compare_reaction_state(old_reaction, reaction) + assert equivalent + + # Test combine=False + reaction = model.reactions.get_by_id("ATPM") + old_reaction = reaction.copy() + old_metabolites = {k.id: v for k, v in old_reaction.metabolites.items()} + reaction_metabolites = old_metabolites.copy() + reaction_metabolites["h2o_c"] = 2.5 + with model: + reaction.add_metabolites({"h2o_c": 2.5}, combine=False) + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["modified"] == { + "_metabolites": (old_metabolites, reaction_metabolites) + } + + # Test adding to a new Reaction + reaction = Reaction("test") + old_reaction = reaction.copy() + reaction.add_metabolites({Metabolite("test_met"): -1}) + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["modified"] == {"_metabolites": ({}, {"test_met": -1})} + + +def test_iadd_reaction_comparison(model: Model) -> None: + """Test in-place addition of reaction is correctly identified by compare.""" + PGI = model.reactions.PGI + PGI_copy = PGI.copy() + EX_h2o = model.reactions.EX_h2o_e + PGI += EX_h2o + PGI_metabolites = {k.id: v for k, v in PGI_copy.metabolites.items()} + PGI_copy_metabolites = PGI_metabolites.copy() + PGI_copy_metabolites[model.metabolites.h2o_e.id] = -1.0 + + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_metabolites"} + ) + assert comparison["modified"] == { + "_metabolites": (PGI_metabolites, PGI_copy_metabolites) + } + # Add a reaction not in the model + new_reaction = Reaction("test") + new_reaction.add_metabolites({Metabolite("A"): -1, Metabolite("B"): 1}) + PGI += new_reaction + PGI_copy_metabolites["A"] = -1 + PGI_copy_metabolites["B"] = 1 + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_metabolites"} + ) + assert comparison["modified"] == { + "_metabolites": (PGI_metabolites, PGI_copy_metabolites) + } + # Combine two GPRs + ACKr_copy = model.reactions.ACKr.copy() + ACKr = model.reactions.ACKr + model.reactions.ACKr += model.reactions.ACONTa + expected_genes = {g.id for g in ACKr.genes} + expected_rule = "(b2296 or b3115 or b1849) and (b0118 or b1276)" + ACKr_metabolites = {m.id: stoic for m, stoic in ACKr_copy.metabolites.items()} + expected_metabolites = ACKr_metabolites.copy() + expected_metabolites.update( + {m.id: stoic for m, stoic in model.reactions.ACONTa.metabolites.items()} + ) + equivalent, comparison = compare_reaction_state(ACKr_copy, ACKr) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_metabolites", "_gpr", "_genes"} + ) + assert comparison["modified"]["_genes"] == ( + {g.id for g in ACKr_copy.genes}, + expected_genes, + ) + assert comparison["modified"]["_gpr"] == ( + ACKr_copy.gpr, + GPR().from_string(expected_rule), + ) + assert comparison["modified"]["_metabolites"] == ( + ACKr_metabolites, + expected_metabolites, + ) + assert comparison["modified"] == { + "_metabolites": (ACKr_metabolites, expected_metabolites), + "_genes": ({g.id for g in ACKr_copy.genes}, expected_genes), + "_gpr": (ACKr_copy.gpr, GPR().from_string(expected_rule)), + } + + +def test_mul_reaction_comparison(model: Model) -> None: + """Test scalar multiplication of factors with a reaction.""" + new = model.reactions.PGI * 2 + PGI = model.reactions.PGI + equivalent, comparison = compare_reaction_state(PGI, new) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_metabolites"} + ) + PGI_metabolites = {m.id: stoic for m, stoic in PGI.metabolites.items()} + new_metabolites = {m.id: stoic * 2 for m, stoic in PGI.metabolites.items()} + assert comparison["modified"] == { + "_metabolites": (PGI_metabolites, new_metabolites) + } + + +def test_sub_reaction_comparison(model: Model) -> None: + """Test reaction subtraction is picked up by comparison function.""" + new = model.reactions.PGI - model.reactions.EX_h2o_e + PGI = model.reactions.PGI + PGI_metabolites = {m.id: stoic for m, stoic in PGI.metabolites.items()} + new_metabolites = PGI_metabolites.copy() + new_metabolites[model.metabolites.h2o_e.id] = 1.0 + equivalent, comparison = compare_reaction_state(PGI, new) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_metabolites"} + ) + assert not equivalent + assert comparison["modified"] == { + "_metabolites": (PGI_metabolites, new_metabolites) + } + + +def test_add_metabolites_combine_true_reaction_comparison(model: Model) -> None: + """Test metabolite addition to reaction (combine = True) with comparison.""" + test_metabolite = Metabolite("test") + for reaction in model.reactions: + old_reaction = reaction.copy() + reaction.add_metabolites({test_metabolite: -66}, combine=True) + reaction_metabolites = { + m.id: stoic for m, stoic in reaction.metabolites.items() + } + old_reaction_metabolites = reaction_metabolites.copy() + old_reaction_metabolites.pop("test") + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert not equivalent + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"_metabolites"} + ) + assert comparison["modified"] == { + "_metabolites": (old_reaction_metabolites, reaction_metabolites) + } + already_included_metabolite = list(reaction.metabolites.keys())[0] + previous_coefficient = reaction.get_coefficient(already_included_metabolite.id) + old_reaction = reaction.copy() + reaction.add_metabolites({already_included_metabolite: 10}, combine=True) + reaction_metabolites = { + m.id: stoic for m, stoic in reaction.metabolites.items() + } + old_reaction_metabolites = reaction_metabolites.copy() + old_reaction_metabolites[already_included_metabolite.id] = previous_coefficient + equivalent, comparison = compare_reaction_state(old_reaction, reaction) + assert comparison["same"] == set(reaction.__getstate__().keys()).difference( + {"_metabolites"} + ) + assert comparison["modified"] == { + "_metabolites": (old_reaction_metabolites, reaction_metabolites) + } + + +def test_reaction_annotation_comparison(model: Model) -> None: + """Test that changes in annotation are picked up by comparison. + + Parameters + ---------- + model: cobra.Model + + + """ + PGI = model.reactions.PGI + PGI_copy = PGI.copy() + PGI_copy_annotation = PGI_copy.annotation + with model: + PGI.annotation = {} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == {"_annotation": (PGI_copy_annotation, {})} + with model: + PGI.annotation = {k: v + " " for k, v in PGI.annotation.items()} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == { + "_annotation": ( + PGI_copy_annotation, + {k: v + " " for k, v in PGI_copy.annotation.items()}, + ) + } + with model: + PGI.annotation["bigg.reaction"] = "Test" + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + PGI_annotation = PGI_copy_annotation + PGI_annotation["bigg.reaction"] = "Test" + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == { + "_annotation": (PGI_copy_annotation, PGI_annotation) + } + + +def test_reaction_notes_comparison(model: Model) -> None: + """Test that notes in reaction can be picked up by comparison function.""" + PGI = model.reactions.PGI + PGI_copy = PGI.copy() + PGI.notes = {"Note": "Test"} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({}, {"Note": "Test"})} + PGI_copy = PGI.copy() + PGI.notes = {"Note": "Test "} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({"Note": "Test"}, {"Note": "Test "})} + PGI.notes = {"Note": "test"} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({"Note": "Test"}, {"Note": "test"})} + PGI.notes = {"note": "Test"} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({"Note": "Test"}, {"note": "Test"})} + PGI.notes = {"Note": "Test", "secondNote": "test"} + equivalent, comparison = compare_reaction_state(PGI_copy, PGI) + assert not equivalent + assert comparison["same"] == set(PGI.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == { + "notes": ( + {"Note": "Test"}, + {"Note": "Test", "secondNote": "test"}, + ) + } + + +def test_reaction_comparison_ignore_keys(model: Model) -> None: + """Test that the ignore_keys field in reaction comparison works as expected.""" + PGI = model.reactions.get_by_id("PGI") + PGI_copy = PGI.copy() + PGI.blah = None + equivalent, comparison = compare_reaction_state(PGI, PGI_copy, ignore_keys={"blah"}) + assert equivalent + assert comparison["same"] == PGI_copy.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert not hasattr(PGI_copy, "blah") + assert hasattr(PGI, "blah") + PGI.__dict__.pop("blah") + + PGI.id = "PGI2" + equivalent, comparison = compare_reaction_state(PGI, PGI_copy, ignore_keys={"_id"}) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference({"_id"}) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.id != PGI_copy.id + PGI.id = PGI_copy.id + + PGI.name = PGI.name + " " + equivalent, comparison = compare_reaction_state(PGI, PGI_copy, ignore_keys={"name"}) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"name"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.name != PGI_copy.name + PGI.name = PGI_copy.name + + with model: + PGI.gene_reaction_rule = "s0001" + equivalent, comparison = compare_reaction_state( + PGI, PGI_copy, ignore_keys={"_genes", "_gpr"} + ) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"_genes", "_gpr"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.gpr != PGI_copy.gpr + + with model: + PGI.bounds = ( + PGI_copy.lower_bound - 100, + PGI_copy.lower_bound - 100, + ) + equivalent, comparison = compare_reaction_state( + PGI, PGI_copy, ignore_keys={"_lower_bound", "_upper_bound"} + ) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"_lower_bound", "_upper_bound"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.bounds != PGI_copy.bounds + + PGI.subsystem = "Test" + equivalent, comparison = compare_reaction_state( + PGI, PGI_copy, ignore_keys={"subsystem"} + ) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"subsystem"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.subsystem != PGI_copy.subsystem + PGI.subsystem = PGI_copy.subsystem + + with model: + PGI.add_metabolites({model.metabolites[0]: 1}) + equivalent, comparison = compare_reaction_state( + PGI, PGI_copy, ignore_keys={"_metabolites"} + ) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"_metabolites"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.metabolites != PGI_copy.metabolites + with model: + PGI.annotation = {} + equivalent, comparison = compare_reaction_state( + PGI, PGI_copy, ignore_keys={"_annotation"} + ) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.annotation != PGI_copy.annotation + with model: + PGI.notes = {"Note": "Test"} + equivalent, comparison = compare_reaction_state( + PGI, PGI_copy, ignore_keys={"notes"} + ) + assert equivalent + assert comparison["same"] == set(PGI_copy.__getstate__().keys()).difference( + {"notes"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert PGI.notes != PGI_copy.notes + + +def test_metabolite_copies_are_equivalent(model: Model) -> None: + """Test that a copy of a metabolite will return True with compare functions.""" + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + equivalent, comparison = compare_state(NADH, NADH_copy) + assert equivalent + assert comparison["same"] == NADH.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_metabolite_with_added_field_is_different(model: Model) -> None: + """Test that metabolites with added fields are picked up by comparison.""" + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH.blah = None + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == NADH_copy.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == {"blah"} + assert comparison["removed"] == set() + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == NADH_copy.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == {"blah"} + + +@pytest.mark.parametrize("field_name", ["_id", "name", "formula", "compartment"]) +def test_metabolite_comparison_different_string_fields( + model: Model, field_name: str +) -> None: + """Test that metabolites that differ in string fields are not identical. + + This function will test id (_id), name, formula, compartment. + + Parameters + ---------- + model: cobra.Model + Model to take metabolites from + field_name: str + Which field to test. + + """ + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH.__setattr__(field_name, NADH.__getattribute__(field_name) + " ") + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ( + NADH_copy.__getattribute__(field_name) + " ", + NADH_copy.__getattribute__(field_name), + ) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.__setattr__(field_name, None) + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: (None, NADH_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.__setattr__(field_name, "") + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("", NADH_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.__setattr__(field_name, "Test") + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("Test", NADH_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.__setattr__(field_name, "C21H27N7O14P") + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("C21H27N7O14P", NADH_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.__setattr__(field_name, "e") + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("e", NADH_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_metabolite_charge_comparison(model: Model) -> None: + """Test that metabolites with different charge are picked up by comparison.""" + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH.charge = 0 + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"charge"}) + assert comparison["modified"] == {"charge": (0, NADH_copy.charge)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.charge = None + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"charge"}) + assert comparison["modified"] == {"charge": (None, NADH_copy.charge)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.charge = 2 + equivalent, comparison = compare_state(NADH, NADH_copy) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"charge"}) + assert comparison["modified"] == {"charge": (2, NADH_copy.charge)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_metabolite_annotation_comparison(model: Model) -> None: + """Test that changes in metabolite annotation are picked up by comparison.""" + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH_copy_annotation = NADH_copy.annotation + with model: + NADH.annotation = {} + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == {"_annotation": (NADH_copy_annotation, {})} + assert comparison["added"] == set() + assert comparison["removed"] == set() + with model: + NADH_annotation = {k: v for k, v in NADH_copy_annotation.items()} + NADH_annotation["bigg.metabolite"] = "Test" + NADH.annotation = NADH_annotation + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == { + "_annotation": (NADH_copy_annotation, NADH_annotation) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + with model: + NADH_annotation = {k: v for k, v in NADH_copy_annotation.items()} + NADH_annotation.pop("biocyc") + NADH.annotation = NADH_annotation + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == { + "_annotation": (NADH_copy_annotation, NADH_annotation) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + with model: + NADH_annotation = {k: v for k, v in NADH_copy_annotation.items()} + NADH_annotation["test"] = "test" + NADH.annotation = NADH_annotation + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == { + "_annotation": (NADH_copy_annotation, NADH_annotation) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + with model: + NADH_annotation = { + "bigg.metabolite": "nadh ", + "biocyc": "NADH ", + "cas": ["58-68-4 "], + "chebi": [ + "CHEBI:13395 ", + "CHEBI:21902 ", + "CHEBI:16908 ", + "CHEBI:7423 ", + "CHEBI:44216 ", + "CHEBI:57945 ", + "CHEBI:13396 ", + ], + "hmdb": "HMDB01487 ", + "kegg.compound": "C00004 ", + "pubchem.substance": "3306 ", + "reactome": [ + "REACT_192305 ", + "REACT_73473 ", + "REACT_194697 ", + "REACT_29362 ", + ], + "seed.compound": "cpd00004 ", + "unipathway.compound": "UPC00004 ", + } + NADH.annotation = NADH_annotation + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == { + "_annotation": (NADH_copy_annotation, NADH_annotation) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_metabolite_notes_comparison(model: Model) -> None: + """Test that notes in Metabolite can be picked up by comparison function.""" + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH.notes = {"Note": "Test"} + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({}, {"Note": "Test"})} + NADH_copy = NADH.copy() + NADH.notes = {"Note": "Test "} + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({"Note": "Test"}, {"Note": "Test "})} + NADH.notes = {"Note": "test"} + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({"Note": "Test"}, {"Note": "test"})} + NADH.notes = {"note": "Test"} + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {"notes": ({"Note": "Test"}, {"note": "Test"})} + NADH.notes = {"Note": "Test", "secondNote": "test"} + equivalent, comparison = compare_state(NADH_copy, NADH) + assert not equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == { + "notes": ( + {"Note": "Test"}, + {"Note": "Test", "secondNote": "test"}, + ) + } + + +def test_metabolite_comparison_ignore_keys(model: Model) -> None: + """Test that the ignore_keys field in Metabolite comparison works as expected.""" + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH.blah = None + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={"blah"}) + assert equivalent + assert comparison["same"] == NADH_copy.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + NADH.__dict__.pop("blah") + + NADH.charge = 0 + equivalent, comparison = compare_state(NADH, NADH_copy, {"charge"}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"charge"}) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.charge != NADH_copy.charge + NADH.charge = NADH_copy.charge + + NADH_copy_annotation = NADH_copy.annotation + with model: + NADH_annotation = {k: v for k, v in NADH_copy_annotation.items()} + NADH_annotation["bigg.metabolite"] = "Test" + NADH.annotation = NADH_annotation + equivalent, comparison = compare_state( + NADH_copy, NADH, ignore_keys={"_annotation"} + ) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.annotation != NADH_copy.annotation + with model: + NADH_annotation = {k: v for k, v in NADH_copy_annotation.items()} + NADH_annotation.pop("biocyc") + NADH.annotation = NADH_annotation + equivalent, comparison = compare_state( + NADH_copy, NADH, ignore_keys={"_annotation"} + ) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {"_annotation"} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.annotation != NADH_copy.annotation + + NADH.notes = {"Note": "Test"} + equivalent, comparison = compare_state(NADH_copy, NADH, ignore_keys={"notes"}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference({"notes"}) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.notes != NADH_copy.notes + + +@pytest.mark.parametrize("field_name", ["_id", "name", "formula", "compartment"]) +def test_metabolite_comparison_ignore_keys_different_string_fields( + model: Model, field_name: str +) -> None: + """Test that ignore keys works on string fields in metaoblites. + + This function will test id (_id), name, formula, compartment. + + Parameters + ---------- + model: cobra.Model + Model to take metabolites from + field_name: str + Which field to test. + + """ + NADH = model.metabolites.get_by_id("nadh_c") + NADH_copy = NADH.copy() + NADH.__setattr__(field_name, NADH.__getattribute__(field_name) + " ") + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={field_name}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.__getattribute__(field_name) != NADH_copy.__getattribute__(field_name) + NADH.__setattr__(field_name, None) + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={field_name}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.__getattribute__(field_name) != NADH_copy.__getattribute__(field_name) + NADH.__setattr__(field_name, "") + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={field_name}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.__getattribute__(field_name) != NADH_copy.__getattribute__(field_name) + NADH.__setattr__(field_name, "Test") + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={field_name}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.__getattribute__(field_name) != NADH_copy.__getattribute__(field_name) + NADH.__setattr__(field_name, "C21H27N7O14P") + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={field_name}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.__getattribute__(field_name) != NADH_copy.__getattribute__(field_name) + NADH.__setattr__(field_name, "e") + equivalent, comparison = compare_state(NADH, NADH_copy, ignore_keys={field_name}) + assert equivalent + assert comparison["same"] == set(NADH.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + assert NADH.__getattribute__(field_name) != NADH_copy.__getattribute__(field_name) + + +def test_gene_copies_are_identical(model: Model) -> None: + """Test that gene copies are considered identical.""" + b1241 = model.genes[0] + b1241_copy = b1241.copy() + equivalent, comparison = compare_state(b1241, b1241_copy) + assert equivalent + assert comparison["same"] == b1241.__getstate__().keys() + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_gene_with_added_field_are_different(model: Model) -> None: + """Test that gene comparison identifies genes with different fields.""" + b1241 = model.genes[0] + b1241_copy = b1241.copy() + b1241.blah = "Test" + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference({"blah"}) + assert comparison["modified"] == {} + assert comparison["added"] == {"blah"} + assert comparison["removed"] == set() + equivalent, comparison = compare_state(b1241_copy, b1241) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference({"blah"}) + assert comparison["modified"] == {} + assert comparison["added"] == set() + assert comparison["removed"] == {"blah"} + + +@pytest.mark.parametrize("field_name", ["_id", "name"]) +def test_gene_comparison_different_string_fields(model: Model, field_name: str) -> None: + """Test that genes that differ in string fields are not identical. + + This function will test id (_id), name. + + Parameters + --------`-- + model: cobra.Model + Model to take genes from + field_name: str + Which field to test. + + """ + b1241 = model.genes[0] + b1241_copy = b1241.copy() + b1241.__setattr__(field_name, b1241.__getattribute__(field_name) + " ") + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ( + b1241_copy.__getattribute__(field_name) + " ", + b1241_copy.__getattribute__(field_name), + ) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + b1241.__setattr__(field_name, None) + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: (None, b1241_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + b1241.__setattr__(field_name, "") + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("", b1241_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + b1241.__setattr__(field_name, "Test") + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("Test", b1241_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + b1241.__setattr__(field_name, "C21H27N7O14P") + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("C21H27N7O14P", b1241_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + b1241.__setattr__(field_name, "e") + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {field_name} + ) + assert comparison["modified"] == { + field_name: ("e", b1241_copy.__getattribute__(field_name)) + } + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +def test_gene_comparison_functional(model: Model) -> None: + """Test that genes that differ in functional are not identical.""" + b1241 = model.genes[0] + b1241_copy = b1241.copy() + b1241.functional = False + equivalent, comparison = compare_state(b1241, b1241_copy) + assert not equivalent + assert comparison["same"] == set(b1241.__getstate__().keys()).difference( + {"_functional"} + ) + assert comparison["modified"] == {"_functional": (False, True)} + assert comparison["added"] == set() + assert comparison["removed"] == set() + + +## Test model +def test_add(model: Model) -> None: + """Test reaction addition to model.""" + # Not in place addition should work on a copy + new = model.reactions.PGI + model.reactions.EX_h2o_e + assert new._model is not model + assert len(new.metabolites) == 3 + # The copy should refer to different metabolites and genes + # This currently fails because add_metabolites does not copy. + # Should that be changed? + # for met in new.metabolites: + # assert met is not model.metabolites.get_by_id(met.id) + # assert met.model is not model + for gene in new.genes: + assert gene is not model.genes.get_by_id(gene.id) + assert gene.model is not model + + +def test_removal_from_model_retains_bounds(model: Model) -> None: + """Test reaction removal from a model, retains its bounds.""" + model_cp = model.copy() + reaction = model_cp.reactions.ACALD + assert reaction.model == model_cp + assert reaction.lower_bound == -1000.0 + assert reaction.upper_bound == 1000.0 + assert reaction._lower_bound == -1000.0 + assert reaction._upper_bound == 1000.0 + model_cp.remove_reactions([reaction]) + assert reaction.model is None + assert reaction.lower_bound == -1000.0 + assert reaction.upper_bound == 1000.0 + assert reaction._lower_bound == -1000.0 + assert reaction._upper_bound == 1000.0 + + +def test_remove_from_model(model: Model) -> None: + """Test reaction removal from model.""" + pgi = model.reactions.PGI + g6p = model.metabolites.g6p_c + pgi_flux = model.optimize().fluxes["PGI"] + assert abs(pgi_flux) > 1e-6 + + with model: + pgi.remove_from_model() + assert pgi.model is None + assert "PGI" not in model.reactions + assert pgi.id not in model.variables + assert pgi.reverse_id not in model.variables + assert pgi not in g6p.reactions + model.optimize() + + assert "PGI" in model.reactions + assert pgi.id in model.variables + assert pgi.reverse_id in model.variables + assert pgi.forward_variable.problem is model.solver + assert pgi in g6p.reactions + assert g6p in pgi.metabolites + assert np.isclose(pgi_flux, model.optimize().fluxes["PGI"])