Skip to content

Commit

Permalink
Capturing stderr from OpenBabel and printing warnings.
Browse files Browse the repository at this point in the history
  • Loading branch information
paulsaxe committed Oct 26, 2022
1 parent 3c16d3f commit e1a2de0
Showing 1 changed file with 133 additions and 44 deletions.
177 changes: 133 additions & 44 deletions read_structure_step/formats/mop/obabel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
Implementation of the reader for XYZ files using OpenBabel
"""

import os
import sys
import threading
import time

import logging
from pathlib import Path
import re
Expand Down Expand Up @@ -47,6 +52,82 @@
}


class OutputGrabber(object):
"""Class used to grab standard output or another stream.
see https://stackoverflow.com/questions/24277488/in-python-how-to-capture-the-stdout-from-a-c-shared-library-to-a-variable/29834357#29834357 # noqa: E501
"""

escape_char = "\b"

def __init__(self, stream=None, threaded=False):
self.origstream = stream
self.threaded = threaded
if self.origstream is None:
self.origstream = sys.stdout
self.origstreamfd = self.origstream.fileno()
self.capturedtext = ""
# Create a pipe so the stream can be captured:
self.pipe_out, self.pipe_in = os.pipe()

def __enter__(self):
self.start()
return self

def __exit__(self, type, value, traceback):
self.stop()

def start(self):
"""
Start capturing the stream data.
"""
self.capturedtext = ""
# Save a copy of the stream:
self.streamfd = os.dup(self.origstreamfd)
# Replace the original stream with our write pipe:
os.dup2(self.pipe_in, self.origstreamfd)
if self.threaded:
# Start thread that will read the stream:
self.workerThread = threading.Thread(target=self.readOutput)
self.workerThread.start()
# Make sure that the thread is running and os.read() has executed:
time.sleep(0.01)

def stop(self):
"""
Stop capturing the stream data and save the text in `capturedtext`.
"""
# Print the escape character to make the readOutput method stop:
self.origstream.write(self.escape_char)
# Flush the stream to make sure all our data goes in before
# the escape character:
self.origstream.flush()
if self.threaded:
# wait until the thread finishes so we are sure that
# we have until the last character:
self.workerThread.join()
else:
self.readOutput()
# Close the pipe:
os.close(self.pipe_in)
os.close(self.pipe_out)
# Restore the original stream:
os.dup2(self.streamfd, self.origstreamfd)
# Close the duplicate stream:
os.close(self.streamfd)

def readOutput(self):
"""
Read the stream data (one byte at a time)
and save the text in `capturedtext`.
"""
while True:
char = os.read(self.pipe_out, 1).decode(self.origstream.encoding)
if not char or self.escape_char in char:
break
self.capturedtext += char


def _find_charge(regex, input_file):
text = re.search(regex, input_file)
if text is not None:
Expand Down Expand Up @@ -303,56 +384,64 @@ def load_mop(
logger.info(f"Input data:\n\n{input_data}\n")

# Now try to convert using OpenBabel
obConversion = openbabel.OBConversion()
if internals:
obConversion.SetInFormat("mopin")
else:
obConversion.SetInFormat("mopcrt")

obMol = openbabel.OBMol()
try:
if run_mopac:
raise RuntimeError("Forcing use of MOPAC output")
success = obConversion.ReadString(obMol, input_data)
if not success:
raise RuntimeError("obConversion failed")
except Exception:
logger.info("**** falling back to MOPAC")
# Try using a MOPAC output file instead. Works for e.g. mixed coordinates
mopac_exe = find_mopac()
if mopac_exe is None:
raise FileNotFoundError("The MOPAC executable could not be found")

# Create an input file
text = ["0SCF", "title", "description"]
text.extend(raw_geometry_lines)
# An empty line denotes end of input
text.append(" ")
files = {"mopac.dat": "\n".join(text)}

logger.debug(f"MOPAC input file:\n\n{files['mopac.dat']}\n")

local = seamm.ExecLocal()
result = local.run(
cmd=[mopac_exe, "mopac.dat"], files=files, return_files=["mopac.out"]
)
out = OutputGrabber(sys.stderr)
with out:
obConversion = openbabel.OBConversion()
if internals:
obConversion.SetInFormat("mopin")
else:
obConversion.SetInFormat("mopcrt")

obMol = openbabel.OBMol()
try:
if run_mopac:
raise RuntimeError("Forcing use of MOPAC output")
success = obConversion.ReadString(obMol, input_data)
if not success:
raise RuntimeError("obConversion failed")
except Exception:
logger.info("**** falling back to MOPAC")
# Try using a MOPAC output file instead. Works for e.g. mixed coordinates
mopac_exe = find_mopac()
if mopac_exe is None:
raise FileNotFoundError("The MOPAC executable could not be found")

# Create an input file
text = ["0SCF", "title", "description"]
text.extend(raw_geometry_lines)
# An empty line denotes end of input
text.append(" ")
files = {"mopac.dat": "\n".join(text)}

logger.debug(f"MOPAC input file:\n\n{files['mopac.dat']}\n")

local = seamm.ExecLocal()
result = local.run(
cmd=[mopac_exe, "mopac.dat"], files=files, return_files=["mopac.out"]
)

if result["mopac.out"]["data"] is None:
raise RuntimeError("MOPAC failed: " + result["mopac.out"]["exception"])

if result["mopac.out"]["data"] is None:
raise RuntimeError("MOPAC failed: " + result["mopac.out"]["exception"])
text = result["mopac.out"]["data"]

text = result["mopac.out"]["data"]
logger.debug(f"MOPAC output:\n\n{text}\n")

logger.debug(f"MOPAC output:\n\n{text}\n")
obConversion.SetInFormat("mopout")
success = obConversion.ReadString(obMol, text)

obConversion.SetInFormat("mopout")
success = obConversion.ReadString(obMol, text)
if not success:
raise RuntimeError("Could not process MOPAC file")

if not success:
raise RuntimeError("Could not process MOPAC file")
if add_hydrogens:
obMol.AddHydrogens()
configuration.from_OBMol(obMol)

if add_hydrogens:
obMol.AddHydrogens()
configuration.from_OBMol(obMol)
# Check any stderr information from obabel.
if out.capturedtext != "":
tmp = out.capturedtext
if "Failed to kekulize aromatic bonds in OBMol::PerceiveBondOrders" not in tmp:
logger.warning(tmp)

# Record the charge, and the spin state
charge = 0
Expand Down

0 comments on commit e1a2de0

Please sign in to comment.