Skip to content

Commit

Permalink
Update transformer, now transforms all to obojson
Browse files Browse the repository at this point in the history
  • Loading branch information
caufieldjh committed Aug 16, 2024
1 parent b27e41c commit 97b742b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/kg_bioportal/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ def download(
return None


@cli.command()
@main.command()
@click.option("input_dir", "-i", default="data/raw", type=click.Path(exists=True))
@click.option("output_dir", "-o", default="data/transformed")
def transform(*args, **kwargs) -> None:
def transform(input_dir, output_dir) -> None:
"""Transforms all ontologies in the input directory to KGX nodes and edges.
Args:
Expand Down
57 changes: 57 additions & 0 deletions src/kg_bioportal/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import logging
import os
import sys

from kg_bioportal.downloader import ONTOLOGY_LIST_NAME
from kg_bioportal.robot_utils import initialize_robot, robot_convert


class Transformer:
Expand All @@ -13,6 +17,8 @@ def __init__(
) -> None:
"""Initializes the Transformer class.
Also sets up ROBOT.
Args:
input_dir: A string pointing to the location of the raw data.
Expand All @@ -29,6 +35,14 @@ def __init__(
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)

# Do ROBOT setup
logging.info("Setting up ROBOT...")
self.robot_path = os.path.join(os.getcwd(), "robot")
self.robot_params = initialize_robot(self.robot_path)
logging.info(f"ROBOT path: {self.robot_path}")
self.robot_env = self.robot_params[1]
logging.info(f"ROBOT evironment variables: {self.robot_env['ROBOT_JAVA_ARGS']}")

return None

def transform_all(self) -> None:
Expand All @@ -45,4 +59,47 @@ def transform_all(self) -> None:
f"Transforming all ontologies in {self.input_dir} to KGX nodes and edges."
)

filepaths = [
os.path.join(self.input_dir, f)
for f in os.listdir(self.input_dir)
if not f.endswith(ONTOLOGY_LIST_NAME)
]

if len(filepaths) == 0:
logging.ERROR(f"No ontologies found in {self.input_dir}.")
sys.exit()
else:
logging.info(f"Found {len(filepaths)} ontologies to transform.")

for filepath in filepaths:
if not self.transform(filepath):
logging.ERROR(f"Error transforming {filepath}.")
else:
logging.info(f"Transformed {filepath}.")

return None

def transform(self, ontology: str) -> bool:
"""Transforms a single ontology to KGX nodes and edges.
Args:
ontology: A string representing the ontology to transform.
Returns:
True if transform was successful, otherwise False.
"""
status = False

logging.info(f"Transforming {ontology} to nodes and edges.")
ontology_name = os.path.splitext(os.path.basename(ontology))[0]
output_path = os.path.join(self.output_dir, f"{ontology_name}.json")
if not robot_convert(
robot_path=self.robot_path,
input_path=ontology,
output_path=output_path,
robot_env=self.robot_env,
):
status = False
status = True

return status

0 comments on commit 97b742b

Please sign in to comment.