From e1a2de020a701087fd6c4538a466eb9d0f2138c9 Mon Sep 17 00:00:00 2001 From: Paul Saxe Date: Wed, 26 Oct 2022 09:37:10 -0400 Subject: [PATCH] Capturing stderr from OpenBabel and printing warnings. --- read_structure_step/formats/mop/obabel.py | 177 ++++++++++++++++------ 1 file changed, 133 insertions(+), 44 deletions(-) diff --git a/read_structure_step/formats/mop/obabel.py b/read_structure_step/formats/mop/obabel.py index 266c48c..3decefc 100644 --- a/read_structure_step/formats/mop/obabel.py +++ b/read_structure_step/formats/mop/obabel.py @@ -2,6 +2,11 @@ Implementation of the reader for XYZ files using OpenBabel """ +import os +import sys +import threading +import time + import logging from pathlib import Path import re @@ -47,6 +52,82 @@ } +class OutputGrabber(object): + """Class used to grab standard output or another stream. + + see https://stackoverflow.com/questions/24277488/in-python-how-to-capture-the-stdout-from-a-c-shared-library-to-a-variable/29834357#29834357 # noqa: E501 + """ + + escape_char = "\b" + + def __init__(self, stream=None, threaded=False): + self.origstream = stream + self.threaded = threaded + if self.origstream is None: + self.origstream = sys.stdout + self.origstreamfd = self.origstream.fileno() + self.capturedtext = "" + # Create a pipe so the stream can be captured: + self.pipe_out, self.pipe_in = os.pipe() + + def __enter__(self): + self.start() + return self + + def __exit__(self, type, value, traceback): + self.stop() + + def start(self): + """ + Start capturing the stream data. + """ + self.capturedtext = "" + # Save a copy of the stream: + self.streamfd = os.dup(self.origstreamfd) + # Replace the original stream with our write pipe: + os.dup2(self.pipe_in, self.origstreamfd) + if self.threaded: + # Start thread that will read the stream: + self.workerThread = threading.Thread(target=self.readOutput) + self.workerThread.start() + # Make sure that the thread is running and os.read() has executed: + time.sleep(0.01) + + def stop(self): + """ + Stop capturing the stream data and save the text in `capturedtext`. + """ + # Print the escape character to make the readOutput method stop: + self.origstream.write(self.escape_char) + # Flush the stream to make sure all our data goes in before + # the escape character: + self.origstream.flush() + if self.threaded: + # wait until the thread finishes so we are sure that + # we have until the last character: + self.workerThread.join() + else: + self.readOutput() + # Close the pipe: + os.close(self.pipe_in) + os.close(self.pipe_out) + # Restore the original stream: + os.dup2(self.streamfd, self.origstreamfd) + # Close the duplicate stream: + os.close(self.streamfd) + + def readOutput(self): + """ + Read the stream data (one byte at a time) + and save the text in `capturedtext`. + """ + while True: + char = os.read(self.pipe_out, 1).decode(self.origstream.encoding) + if not char or self.escape_char in char: + break + self.capturedtext += char + + def _find_charge(regex, input_file): text = re.search(regex, input_file) if text is not None: @@ -303,56 +384,64 @@ def load_mop( logger.info(f"Input data:\n\n{input_data}\n") # Now try to convert using OpenBabel - obConversion = openbabel.OBConversion() - if internals: - obConversion.SetInFormat("mopin") - else: - obConversion.SetInFormat("mopcrt") - - obMol = openbabel.OBMol() - try: - if run_mopac: - raise RuntimeError("Forcing use of MOPAC output") - success = obConversion.ReadString(obMol, input_data) - if not success: - raise RuntimeError("obConversion failed") - except Exception: - logger.info("**** falling back to MOPAC") - # Try using a MOPAC output file instead. Works for e.g. mixed coordinates - mopac_exe = find_mopac() - if mopac_exe is None: - raise FileNotFoundError("The MOPAC executable could not be found") - - # Create an input file - text = ["0SCF", "title", "description"] - text.extend(raw_geometry_lines) - # An empty line denotes end of input - text.append(" ") - files = {"mopac.dat": "\n".join(text)} - - logger.debug(f"MOPAC input file:\n\n{files['mopac.dat']}\n") - - local = seamm.ExecLocal() - result = local.run( - cmd=[mopac_exe, "mopac.dat"], files=files, return_files=["mopac.out"] - ) + out = OutputGrabber(sys.stderr) + with out: + obConversion = openbabel.OBConversion() + if internals: + obConversion.SetInFormat("mopin") + else: + obConversion.SetInFormat("mopcrt") + + obMol = openbabel.OBMol() + try: + if run_mopac: + raise RuntimeError("Forcing use of MOPAC output") + success = obConversion.ReadString(obMol, input_data) + if not success: + raise RuntimeError("obConversion failed") + except Exception: + logger.info("**** falling back to MOPAC") + # Try using a MOPAC output file instead. Works for e.g. mixed coordinates + mopac_exe = find_mopac() + if mopac_exe is None: + raise FileNotFoundError("The MOPAC executable could not be found") + + # Create an input file + text = ["0SCF", "title", "description"] + text.extend(raw_geometry_lines) + # An empty line denotes end of input + text.append(" ") + files = {"mopac.dat": "\n".join(text)} + + logger.debug(f"MOPAC input file:\n\n{files['mopac.dat']}\n") + + local = seamm.ExecLocal() + result = local.run( + cmd=[mopac_exe, "mopac.dat"], files=files, return_files=["mopac.out"] + ) + + if result["mopac.out"]["data"] is None: + raise RuntimeError("MOPAC failed: " + result["mopac.out"]["exception"]) - if result["mopac.out"]["data"] is None: - raise RuntimeError("MOPAC failed: " + result["mopac.out"]["exception"]) + text = result["mopac.out"]["data"] - text = result["mopac.out"]["data"] + logger.debug(f"MOPAC output:\n\n{text}\n") - logger.debug(f"MOPAC output:\n\n{text}\n") + obConversion.SetInFormat("mopout") + success = obConversion.ReadString(obMol, text) - obConversion.SetInFormat("mopout") - success = obConversion.ReadString(obMol, text) + if not success: + raise RuntimeError("Could not process MOPAC file") - if not success: - raise RuntimeError("Could not process MOPAC file") + if add_hydrogens: + obMol.AddHydrogens() + configuration.from_OBMol(obMol) - if add_hydrogens: - obMol.AddHydrogens() - configuration.from_OBMol(obMol) + # Check any stderr information from obabel. + if out.capturedtext != "": + tmp = out.capturedtext + if "Failed to kekulize aromatic bonds in OBMol::PerceiveBondOrders" not in tmp: + logger.warning(tmp) # Record the charge, and the spin state charge = 0