Skip to content

Commit

Permalink
Merge pull request #884 from MolSSI/ingest
Browse files Browse the repository at this point in the history
External Data Ingestion Part 1
  • Loading branch information
bennybp authored Jan 23, 2025
2 parents 830a31f + 52fb943 commit f9b251a
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 190 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pydantic.v1 as pydantic
except ImportError:
import pydantic
from qcelemental.models import Molecule, FailedOperation, ComputeError, OptimizationResult
from qcelemental.models import Molecule, FailedOperation, ComputeError, OptimizationResult as QCEl_OptimizationResult
from qcelemental.models.procedures import OptimizationProtocols

from qcarchivetesting.helpers import read_record_data
Expand Down Expand Up @@ -101,13 +101,13 @@ def generate_task_key(task: RecordTask):
return mol_hash + "|" + constraints_str


def load_test_data(name: str) -> Tuple[GridoptimizationSpecification, Molecule, Dict[str, OptimizationResult]]:
def load_test_data(name: str) -> Tuple[GridoptimizationSpecification, Molecule, Dict[str, QCEl_OptimizationResult]]:
test_data = read_record_data(name)

return (
pydantic.parse_obj_as(GridoptimizationSpecification, test_data["specification"]),
pydantic.parse_obj_as(Molecule, test_data["initial_molecule"]),
pydantic.parse_obj_as(Dict[str, OptimizationResult], test_data["results"]),
pydantic.parse_obj_as(Dict[str, QCEl_OptimizationResult], test_data["results"]),
)


Expand All @@ -116,7 +116,7 @@ def submit_test_data(
name: str,
tag: Optional[str] = "*",
priority: PriorityEnum = PriorityEnum.normal,
) -> Tuple[int, Dict[str, OptimizationResult]]:
) -> Tuple[int, Dict[str, QCEl_OptimizationResult]]:
input_spec, molecule, result = load_test_data(name)
meta, record_ids = storage_socket.records.gridoptimization.add(
[molecule], input_spec, tag, priority, None, None, True
Expand Down
8 changes: 4 additions & 4 deletions qcfractal/qcfractal/components/manybody/testing_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pydantic.v1 as pydantic
except ImportError:
import pydantic
from qcelemental.models import Molecule, FailedOperation, ComputeError, AtomicResult
from qcelemental.models import Molecule, FailedOperation, ComputeError, AtomicResult as QCEl_AtomicResult

from qcarchivetesting.helpers import read_record_data
from qcfractal.components.manybody.record_db_models import ManybodyRecordORM
Expand Down Expand Up @@ -95,13 +95,13 @@ def generate_task_key(task: RecordTask):
return "singlepoint" + "|" + mol_hash


def load_test_data(name: str) -> Tuple[ManybodySpecification, Molecule, Dict[str, AtomicResult]]:
def load_test_data(name: str) -> Tuple[ManybodySpecification, Molecule, Dict[str, QCEl_AtomicResult]]:
test_data = read_record_data(name)

return (
pydantic.parse_obj_as(ManybodySpecification, test_data["specification"]),
pydantic.parse_obj_as(Molecule, test_data["molecule"]),
pydantic.parse_obj_as(Dict[str, AtomicResult], test_data["results"]),
pydantic.parse_obj_as(Dict[str, QCEl_AtomicResult], test_data["results"]),
)


Expand All @@ -110,7 +110,7 @@ def submit_test_data(
name: str,
tag: Optional[str] = "*",
priority: PriorityEnum = PriorityEnum.normal,
) -> Tuple[int, Dict[str, AtomicResult]]:
) -> Tuple[int, Dict[str, QCEl_AtomicResult]]:
input_spec, molecule, result = load_test_data(name)
meta, record_ids = storage_socket.records.manybody.add([molecule], input_spec, tag, priority, None, None, True)
assert meta.success
Expand Down
16 changes: 12 additions & 4 deletions qcfractal/qcfractal/components/neb/testing_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
import pydantic.v1 as pydantic
except ImportError:
import pydantic
from qcelemental.models import Molecule, FailedOperation, ComputeError, AtomicResult, OptimizationResult

from qcarchivetesting.helpers import read_record_data
from qcelemental.models import (
Molecule,
FailedOperation,
ComputeError,
AtomicResult as QCEl_AtomicResult,
OptimizationResult as QCEl_OptimizationResult,
)

from qcfractal.components.neb.record_db_models import NEBRecordORM
from qcfractal.testing_helpers import run_service
from qcportal.generic_result import GenericTaskResult
Expand Down Expand Up @@ -91,14 +97,16 @@ def generate_task_key(task: RecordTask):

def load_test_data(
name: str,
) -> Tuple[NEBSpecification, List[Molecule], Dict[str, Union[AtomicResult, OptimizationResult, GenericTaskResult]]]:
) -> Tuple[
NEBSpecification, List[Molecule], Dict[str, Union[QCEl_AtomicResult, QCEl_OptimizationResult, GenericTaskResult]]
]:
test_data = read_record_data(name)

return (
pydantic.parse_obj_as(NEBSpecification, test_data["specification"]),
pydantic.parse_obj_as(List[Molecule], test_data["initial_chain"]),
pydantic.parse_obj_as(
Dict[str, Union[AtomicResult, OptimizationResult, GenericTaskResult]], test_data["results"]
Dict[str, Union[QCEl_AtomicResult, QCEl_OptimizationResult, GenericTaskResult]], test_data["results"]
),
)

Expand Down
71 changes: 71 additions & 0 deletions qcfractal/qcfractal/components/optimization/record_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from qcportal.record_models import PriorityEnum, RecordStatusEnum
from qcportal.serialization import convert_numpy_recursive
from qcportal.singlepoint import QCSpecification
from qcportal.singlepoint import (
SinglepointDriver,
)
Expand Down Expand Up @@ -148,6 +149,76 @@ def update_completed_task(self, session: Session, record_id: int, result: QCEl_O
stmt = update(OptimizationRecordORM).where(OptimizationRecordORM.id == record_id).values(record_updates)
session.execute(stmt)

def insert_complete_schema_v1(
self,
session: Session,
results: Sequence[QCEl_OptimizationResult],
) -> List[OptimizationRecordORM]:

ret = []

initial_mols = []
final_mols = []
opt_specs = []

for result in results:
initial_mols.append(result.initial_molecule)
final_mols.append(result.final_molecule)

# in v1 of the schema, the qc program is stored as a keyword
qc_program = result.keywords.pop("program", "")

qc_spec = QCSpecification(
program=qc_program,
driver=result.input_specification.driver,
method=result.input_specification.model.method,
basis=result.input_specification.model.basis,
keywords=result.input_specification.keywords,
# no protocols allowed in v1 of the schema
)

opt_spec = OptimizationSpecification(
program=result.provenance.creator.lower(),
qc_specification=qc_spec,
keywords=result.keywords,
protocols=result.protocols,
)

opt_specs.append(opt_spec)

meta, spec_ids = self.root_socket.records.optimization.add_specifications(opt_specs, session=session)
if not meta.success:
raise RuntimeError("Aborted optimization insertion - could not add specifications: " + meta.error_string)

meta, initial_mol_ids = self.root_socket.molecules.add(initial_mols, session=session)
if not meta.success:
raise RuntimeError("Aborted optimization insertion - could not add initial molecules: " + meta.error_string)

meta, final_mol_ids = self.root_socket.molecules.add(final_mols, session=session)
if not meta.success:
raise RuntimeError("Aborted optimization insertion - could not add final molecules: " + meta.error_string)

for result, initial_mol_id, final_mol_id, spec_id in zip(results, initial_mol_ids, final_mol_ids, spec_ids):
record_orm = OptimizationRecordORM(
specification_id=spec_id,
initial_molecule_id=initial_mol_id,
final_molecule_id=final_mol_id,
energies=result.energies,
status=RecordStatusEnum.complete,
)

if result.trajectory:
trajectory_ids = self.root_socket.records.insert_complete_schema_v1(session, result.trajectory)
opt_traj_orm = [
OptimizationTrajectoryORM(singlepoint_id=tid, position=idx)
for idx, tid in enumerate(trajectory_ids)
]
record_orm.trajectory = opt_traj_orm

ret.append(record_orm)

return ret

def add_specifications(
self, opt_specs: Sequence[OptimizationSpecification], *, session: Optional[Session] = None
) -> Tuple[InsertMetadata, List[int]]:
Expand Down
131 changes: 83 additions & 48 deletions qcfractal/qcfractal/components/optimization/test_record_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from qcarchivetesting import load_molecule_data
from qcfractal.components.optimization.record_db_models import OptimizationRecordORM
from qcfractal.components.optimization.testing_helpers import test_specs, load_test_data, run_test_data
from qcfractal.components.testing_helpers import convert_to_plain_qcschema_result
from qcfractal.db_socket import SQLAlchemySocket
from qcportal.compression import decompress
from qcportal.managers import ManagerName
from qcportal.molecules import Molecule
from qcportal.optimization import (
Expand All @@ -21,13 +21,56 @@
SinglepointProtocols,
)
from qcportal.utils import now_at_utc
from ..record_socket import build_extras_properties

if TYPE_CHECKING:
from qcfractal.db_socket import SQLAlchemySocket
from sqlalchemy.orm.session import Session
from typing import List, Dict


def _compare_record_with_schema(record_orm, result_schema):
assert record_orm.status == RecordStatusEnum.complete
assert record_orm.specification.program == result_schema.provenance.creator.lower()

kw_no_prog = result_schema.keywords.copy()
kw_no_prog["program"] = result_schema.keywords["program"]
assert kw_no_prog == result_schema.keywords

# The singlepoint spec
assert record_orm.specification.qc_specification.program == result_schema.keywords["program"]
assert record_orm.specification.qc_specification.method == result_schema.input_specification.model.method
assert record_orm.specification.qc_specification.basis == result_schema.input_specification.model.basis
assert record_orm.specification.qc_specification.keywords == result_schema.input_specification.keywords

assert len(record_orm.compute_history) == 1
assert record_orm.compute_history[0].status == RecordStatusEnum.complete
assert record_orm.compute_history[0].provenance == result_schema.provenance

# Test the trajectory
assert len(record_orm.trajectory) == len(result_schema.trajectory)
for db_traj, res_traj in zip(record_orm.trajectory, result_schema.trajectory):
assert db_traj.singlepoint_record.specification.program == res_traj.provenance.creator.lower()
assert db_traj.singlepoint_record.specification.basis == res_traj.model.basis
assert db_traj.singlepoint_record.molecule.identifiers["molecule_hash"] == res_traj.molecule.get_hash()

# Use plain schema, where compressed stuff is removed
new_extras, new_properties = build_extras_properties(result_schema.copy(deep=True))
assert record_orm.properties == new_properties
assert record_orm.extras == new_extras

# TODO - eventually schema may have these
assert record_orm.native_files == {}

for k in ("stdout", "stderr", "error"):
plain_output = getattr(result_schema, k)
if plain_output is not None:
out_str = record_orm.compute_history[0].outputs[k].get_output()
assert out_str == plain_output
else:
assert k not in record_orm.compute_history[0].outputs


@pytest.mark.parametrize("spec", test_specs)
def test_optimization_socket_task_spec(
storage_socket: SQLAlchemySocket,
Expand Down Expand Up @@ -240,50 +283,42 @@ def test_optimization_socket_run(

for rec_id, result in zip(all_id, all_results):
record = session.get(OptimizationRecordORM, rec_id)
assert record.status == RecordStatusEnum.complete
assert record.specification.program == result.provenance.creator.lower()

kw_no_prog = result.keywords.copy()
kw_no_prog["program"] = result.keywords["program"]
assert kw_no_prog == result.keywords

# The singlepoint spec
assert record.specification.qc_specification.program == result.keywords["program"]
assert record.specification.qc_specification.method == result.input_specification.model.method
assert record.specification.qc_specification.basis == result.input_specification.model.basis
assert record.specification.qc_specification.keywords == result.input_specification.keywords

assert len(record.compute_history) == 1
assert record.compute_history[0].status == RecordStatusEnum.complete
assert record.compute_history[0].provenance == result.provenance

desc_info = storage_socket.records.get_short_descriptions([rec_id])[0]
short_desc = desc_info["description"]
assert desc_info["record_type"] == record.record_type
assert desc_info["created_on"] == record.created_on
assert record.specification.program in short_desc
assert record.specification.qc_specification.program in short_desc
assert record.specification.qc_specification.method in short_desc

outs = record.compute_history[0].outputs

avail_outputs = set(outs.keys())
result_outputs = {x for x in ["stdout", "stderr", "error"] if getattr(result, x, None) is not None}
compressed_outputs = result.extras.get("_qcfractal_compressed_outputs", {})
result_outputs |= set(compressed_outputs.keys())
assert avail_outputs == result_outputs

# NOTE - this only works for string outputs (not dicts)
# but those are used for errors, which aren't covered here
for out in outs.values():
o_str = out.get_output()
co = result.extras["_qcfractal_compressed_outputs"][out.output_type]
ro = decompress(co["data"], co["compression_type"])
assert o_str == ro

# Test the trajectory
assert len(record.trajectory) == len(result.trajectory)
for db_traj, res_traj in zip(record.trajectory, result.trajectory):
assert db_traj.singlepoint_record.specification.program == res_traj.provenance.creator.lower()
assert db_traj.singlepoint_record.specification.basis == res_traj.model.basis
assert db_traj.singlepoint_record.molecule.identifiers["molecule_hash"] == res_traj.molecule.get_hash()

plain_result = convert_to_plain_qcschema_result(result)
_compare_record_with_schema(record, plain_result)


def test_optimization_socket_insert_complete_schema_v1(storage_socket: SQLAlchemySocket, session: Session):
test_names = [
"opt_psi4_benzene",
"opt_psi4_fluoroethane_notraj",
"opt_psi4_methane",
"opt_psi4_methane_sometraj",
]

all_ids = []

for test_name in test_names:
_, _, result_schema = load_test_data(test_name)

plain_schema = convert_to_plain_qcschema_result(result_schema)

# Need a full copy of results - they can get mutated
with storage_socket.session_scope() as session2:
ins_ids_1 = storage_socket.records.insert_complete_schema_v1(session2, [result_schema.copy(deep=True)])
ins_ids_2 = storage_socket.records.insert_complete_schema_v1(session2, [plain_schema.copy(deep=True)])

ins_id_1 = ins_ids_1[0]
ins_id_2 = ins_ids_2[0]

# insert_complete_schema always inserts
assert ins_id_1 != ins_id_2
assert ins_id_1 not in all_ids
assert ins_id_2 not in all_ids
all_ids.extend([ins_id_1, ins_id_2])

rec_1 = session.get(OptimizationRecordORM, ins_id_1)
rec_2 = session.get(OptimizationRecordORM, ins_id_2)

_compare_record_with_schema(rec_1, plain_schema)
_compare_record_with_schema(rec_2, plain_schema)
15 changes: 10 additions & 5 deletions qcfractal/qcfractal/components/optimization/testing_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
import pydantic.v1 as pydantic
except ImportError:
import pydantic
from qcelemental.models import Molecule, FailedOperation, ComputeError, OptimizationResult

from qcarchivetesting.helpers import read_record_data
from qcelemental.models import (
Molecule,
FailedOperation,
ComputeError,
OptimizationResult as QCEl_OptimizationResult,
)

from qcfractal.components.optimization.record_db_models import OptimizationRecordORM
from qcfractalcompute.compress import compress_result
from qcportal.optimization import OptimizationSpecification
Expand Down Expand Up @@ -71,13 +76,13 @@
]


def load_test_data(name: str) -> Tuple[OptimizationSpecification, Molecule, OptimizationResult]:
def load_test_data(name: str) -> Tuple[OptimizationSpecification, Molecule, QCEl_OptimizationResult]:
test_data = read_record_data(name)

return (
pydantic.parse_obj_as(OptimizationSpecification, test_data["specification"]),
pydantic.parse_obj_as(Molecule, test_data["initial_molecule"]),
pydantic.parse_obj_as(OptimizationResult, test_data["result"]),
pydantic.parse_obj_as(QCEl_OptimizationResult, test_data["result"]),
)


Expand All @@ -86,7 +91,7 @@ def submit_test_data(
name: str,
tag: Optional[str] = "*",
priority: PriorityEnum = PriorityEnum.normal,
) -> Tuple[int, OptimizationResult]:
) -> Tuple[int, QCEl_OptimizationResult]:
input_spec, molecule, result = load_test_data(name)
meta, record_ids = storage_socket.records.optimization.add([molecule], input_spec, tag, priority, None, None, True)
assert meta.success
Expand Down
Loading

0 comments on commit f9b251a

Please sign in to comment.