diff --git a/src/ephemeris/_config_models.py b/src/ephemeris/_config_models.py index d593c35..38b97c5 100644 --- a/src/ephemeris/_config_models.py +++ b/src/ephemeris/_config_models.py @@ -7,10 +7,17 @@ ) import yaml -from pydantic import ( - BaseModel, - Extra, -) + +try: + from pydantic.v1 import ( + BaseModel, + Extra, + ) +except ImportError: + from pydantic import ( + BaseModel, + Extra, + ) StrOrPath = Union[Path, str] @@ -35,7 +42,18 @@ class RepositoryInstallTargets(BaseModel): tools: List[RepositoryInstallTarget] -class DataManager(BaseModel, extra=Extra.forbid): +class DictOrValue(BaseModel): + __root__: Union[Dict[str, Union[str, int, float, bool, "DictOrValue"]], Union[str, int, float, bool]] + + +class Parameters(BaseModel): + parameters: Optional[DictOrValue] = None + + +DictOrValue.update_forward_refs() + + +class DataManager(Parameters, extra=Extra.forbid): tags: List[str] tool_id: str @@ -61,7 +79,7 @@ class Genome(BaseModel): # Description of actions (data managers) to run on target genome. indexers: Optional[ - List[str] + List[Union[str, Dict[str, Parameters]]] ] # indexers to run - keyed on repository name - see data_managers.yml for how to resolve these to tools skiplist: Optional[List[str]] = ( None # unimplemented: but if we implement classes of indexers, these will be ones to skip diff --git a/src/ephemeris/_idc_data_managers_to_tools.py b/src/ephemeris/_idc_data_managers_to_tools.py index dd14ead..c3e857b 100644 --- a/src/ephemeris/_idc_data_managers_to_tools.py +++ b/src/ephemeris/_idc_data_managers_to_tools.py @@ -7,11 +7,6 @@ """ import argparse import logging -from typing import ( - Dict, - List, - NamedTuple, -) import yaml @@ -29,29 +24,10 @@ ) -class DataManager(NamedTuple): - tool_id: str - repository_name: str - tags: List[str] - - -def read_data_managers_configuration(path: str) -> Dict[str, DataManager]: - raw_data_managers = read_data_managers(path) - data_managers: Dict[str, DataManager] = {} - for repository_name, data_manager_configuration in raw_data_managers.__root__.items(): - data_manager = DataManager( - tool_id=data_manager_configuration.tool_id, - repository_name=repository_name, - tags=data_manager_configuration.tags or [], - ) - data_managers[repository_name] = data_manager - return data_managers - - def build_shed_install_conf(path: str) -> dict: - data_managers = read_data_managers_configuration(path) + data_managers = read_data_managers(path) tools = [] - for data_manager in data_managers.values(): + for data_manager in data_managers.__root__.values(): tool_id = data_manager.tool_id tool_id_parts = tool_id.split("/") repo_owner = tool_id_parts[2] diff --git a/src/ephemeris/_idc_split_data_manager_genomes.py b/src/ephemeris/_idc_split_data_manager_genomes.py index da737b7..222f3c4 100644 --- a/src/ephemeris/_idc_split_data_manager_genomes.py +++ b/src/ephemeris/_idc_split_data_manager_genomes.py @@ -6,6 +6,7 @@ by genomes.yml that have already been executed and appear in the target installed data table configuration. """ +import json import logging import os import re @@ -22,15 +23,24 @@ import requests import yaml from galaxy.util import safe_makedirs -from pydantic import ( - BaseModel, - Extra, -) + +try: + from pydantic.v1 import ( + BaseModel, + Extra, + ) +except ImportError: + from pydantic import ( + BaseModel, + Extra, + ) from . import get_galaxy_connection -from ._idc_data_managers_to_tools import ( +from ._config_models import ( DataManager, - read_data_managers_configuration, + DataManagers, + DictOrValue, + read_data_managers, ) from .common_parser import get_common_args from .ephemeris_log import ( @@ -70,8 +80,8 @@ class SplitOptions: filters: Filters = Filters() -def tool_id_for(indexer: str, data_managers: Dict[str, DataManager], mode: str) -> str: - data_manager = data_managers[indexer] +def tool_id_for(indexer: str, data_managers: DataManagers, mode: str) -> str: + data_manager = data_managers.__root__[indexer] assert data_manager, f"Could not find a target data manager for indexer name {indexer}" tool_shed_guid = data_manager.tool_id if mode == "short": @@ -88,19 +98,13 @@ def tool_id_for(indexer: str, data_managers: Dict[str, DataManager], mode: str) class RunDataManager(BaseModel): id: str items: Optional[List[Any]] = None - params: Optional[List[Any]] = None - data_table_reload: Optional[List[str]] = None + params: Optional[DictOrValue] = None class RunDataManagers(BaseModel): data_managers: List[RunDataManager] -class DataManager(BaseModel, extra=Extra.forbid): - tags: List[str] - tool_id: str - - class DataManagers(BaseModel, extra=Extra.forbid): __root__: Dict[str, DataManager] @@ -149,7 +153,7 @@ def write_run_data_manager_to_file(run_data_manager: RunDataManager, path: str): def walk_over_incomplete_runs(split_options: SplitOptions): - data_managers = read_data_managers_configuration(split_options.data_managers_path) + data_managers = read_data_managers(split_options.data_managers_path) with open(split_options.merged_genomes_path) as f: genomes_all = yaml.safe_load(f) genomes = genomes_all["genomes"] @@ -169,36 +173,34 @@ def walk_over_incomplete_runs(split_options: SplitOptions): if do_fetch and not split_options.is_build_complete(build_id, fetch_indexer): log.info(f"Fetching: {build_id}") fetch_tool_id = tool_id_for(fetch_indexer, data_managers, split_options.tool_id_mode) - fetch_params = [] - fetch_params.append({"dbkey_source|dbkey_source_selector": "new"}) - fetch_params.append({"dbkey_source|dbkey": genome["id"]}) description = genome.get("description") + fetch_params = { + "dbkey_source": {"dbkey_source_selector": "new", "dbkey": genome["id"]}, + "sequence_id": genome["id"], + "sequence_name": description, + } source = genome.get("source") if source == "ucsc": if not description: - description = ucsc_description_for_build(genome["id"]) - fetch_params.append({"reference_source|reference_source_selector": "ucsc"}) - fetch_params.append({"reference_source|requested_dbkey": genome["id"]}) - fetch_params.append({"sequence_name": description}) + fetch_params["sequence_name"] = ucsc_description_for_build(genome["id"]) + fetch_params["reference_source"] = { + "reference_source_selector": "ucsc", + "requested_dbkey": genome["id"], + } elif re.match("^[A-Z_]+[0-9.]+", source): - fetch_params.append({"reference_source|reference_source_selector": "ncbi"}) - fetch_params.append({"reference_source|requested_identifier": source}) - fetch_params.append({"sequence_name": genome["description"]}) - fetch_params.append({"sequence.id": genome["id"]}) + fetch_params["reference_source"] = { + "reference_source_selector": "ncbi", + "requested_identifier": source, + } elif re.match("^http", source): - fetch_params.append({"reference_source|reference_source_selector": "url"}) - fetch_params.append({"reference_source|user_url": source}) - fetch_params.append({"sequence_name": genome["description"]}) - fetch_params.append({"sequence.id": genome["id"]}) + fetch_params["reference_source"] = {"reference_source_selector": "url", "user_url": source} if description: - fetch_params.append({"dbkey_source|dbkey_name": description}) + fetch_params["dbkey_source"]["dbkey_name"] = description fetch_run_data_manager = RunDataManager( id=fetch_tool_id, params=fetch_params, - # Not needed according to Marius - # data_table_reload=["all_fasta", "__dbkeys__"], ) yield (build_id, fetch_indexer, fetch_run_data_manager) else: @@ -206,29 +208,36 @@ def walk_over_incomplete_runs(split_options: SplitOptions): indexers = genome.get("indexers", []) for indexer in indexers: - if split_options.filters.filter_out_data_manager(indexer): + if isinstance(indexer, dict): + indexer_name = next(iter(indexer.keys())) + indexer_parameters = indexer[indexer_name]["parameters"] + else: + indexer_name = indexer + indexer_parameters = {} + if split_options.filters.filter_out_data_manager(indexer_name): continue if split_options.filters.filter_out_stage(1): continue - if split_options.is_build_complete(build_id, indexer): - log.debug(f"Build is already completed: {build_id} {indexer}") + if split_options.is_build_complete(build_id, indexer_name): + log.debug(f"Build is already completed: {build_id} {indexer_name}") continue - log.info(f"Building: {build_id} {indexer}") - - tool_id = tool_id_for(indexer, data_managers, split_options.tool_id_mode) - params = [ - {"all_fasta_source": "{{ item.id }}"}, - {"sequence_name": "{{ item.name }}"}, - {"sequence_id": "{{ item.id }}"}, - ] - # why is this not pulled from the data managers conf? -nate - if re.search("bwa", tool_id): - params.append({"index_algorithm": "bwtsw"}) - if re.search("color_space", tool_id): - continue + log.info(f"Building: {build_id} {indexer_name}") + + tool_id = tool_id_for(indexer_name, data_managers, split_options.tool_id_mode) + data_manager = data_managers.__root__[indexer_name] + data_manager_parameters = {} + if data_manager.parameters: + data_manager_parameters = json.loads(data_manager.parameters.json()) or {} + data_manager_parameters.update(indexer_parameters) + if not data_manager_parameters: + data_manager_parameters = { + "all_fasta_source": "{{ item.id }}", + "sequence_name": "{{ item.name }}", + "sequence_id": "{{ item.id }}", + } item = deepcopy(genome) item.pop("indexers", None) @@ -236,10 +245,10 @@ def walk_over_incomplete_runs(split_options: SplitOptions): run_data_manager = RunDataManager( id=tool_id, - params=params, + params=data_manager_parameters, items=[item], ) - yield (build_id, indexer, run_data_manager) + yield (build_id, indexer_name, run_data_manager) def split_genomes(split_options: SplitOptions) -> None: diff --git a/src/ephemeris/run_data_managers.py b/src/ephemeris/run_data_managers.py index c109429..882ce1a 100644 --- a/src/ephemeris/run_data_managers.py +++ b/src/ephemeris/run_data_managers.py @@ -33,6 +33,7 @@ from bioblend.galaxy import GalaxyInstance from bioblend.galaxy.tool_data import ToolDataClient from bioblend.galaxy.tools import ToolClient +from boltons.iterutils import remap from jinja2 import Template from typing_extensions import Literal @@ -160,14 +161,21 @@ def get_dm_jobs(self, dm): def handle_item(item: str): dm_id = dm["id"] params = dm["params"] - inputs = dict() - # Iterate over all parameters, replace occurences of {{item}} with the current processing item + + # Iterate over all parameters, replace occurrences of {{item}} with the current processing item # and create the tool_inputs dict for running the data manager job - for param in params: - key, value = list(param.items())[0] - value_template = Template(value) - value = value_template.render(item=item) - inputs.update({key: value}) + def template_values(path, key, value): + if isinstance(value, str): + value = Template(value).render(item=item) + return key, value + + inputs = remap(params, visit=template_values) + + if isinstance(inputs, list): + # "legacy" run data manager input format + # which is a flat list with stringified param keys and values, + # this needs to be turned into a dict + inputs = {k: v for param in inputs for k, v in param.items()} job = dict(tool_id=dm_id, inputs=inputs) diff --git a/tests/test_split_genomes.py b/tests/test_split_genomes.py index 8e186f3..af139e2 100644 --- a/tests/test_split_genomes.py +++ b/tests/test_split_genomes.py @@ -49,13 +49,36 @@ - genome """ +DATA_MANAGER_YAML_WITH_PARAMS = """ +the_data_manager: + tool_id: toolshed.g2.bx.psu.edu/repos/iuc/the_data_manager/the_data_manager/0.0.1 + parameters: + conditional: + param_a: a + param_b: b + tags: + - dm_tag +""" + +GENOMES_WITH_PARAMS = """ +genomes: + - dbkey: cat + description: fluffy + id: cat + indexers: + - the_data_manager: + parameters: + conditional: + param_c: c +""" + -def setup_mock_idc_dir(directory: Path): +def setup_mock_idc_dir(directory: Path, genomes=MERGED_YAML_STR, data_managers=DATA_MANAGER_YAML_STR): merged = directory / "genomes.yml" - merged.write_text(MERGED_YAML_STR) + merged.write_text(genomes) - data_managers = directory / "data_managers.yml" - data_managers.write_text(DATA_MANAGER_YAML_STR) + data_managers_path = directory / "data_managers.yml" + data_managers_path.write_text(data_managers) def read_and_validate_run_data_manager_yaml(path): @@ -98,6 +121,20 @@ def test_split_genomes(tmp_path: Path): assert data_manager.items[0]["dbkey"] == "hg19_rCRS_pUC18_phiX174" +def test_split_genomes_with_params(tmp_path): + setup_mock_idc_dir(tmp_path, GENOMES_WITH_PARAMS, DATA_MANAGER_YAML_WITH_PARAMS) + split_path = tmp_path / "split" + split_options = split_options_for(tmp_path) + split_genomes(split_options) + new_task = split_path / "cat" / "the_data_manager" + new_task_run_yaml = new_task / "run_data_managers.yaml" + run = read_and_validate_run_data_manager_yaml(new_task_run_yaml) + assert len(run.data_managers) == 1 + data_manager = run.data_managers[0] + # genome config overwrites data manager config + assert data_manager.params.json() == '{"conditional": {"param_c": "c"}}' + + def test_split_genomes_short_ids(tmp_path: Path): setup_mock_idc_dir(tmp_path) split_path = tmp_path / "split"