Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dm parameters #213

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/ephemeris/_config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
)

import yaml
from pydantic import (
BaseModel,
Extra,
)

try:
from pydantic.v1 import (
BaseModel,
Extra,
)
except ImportError:
from pydantic import (
BaseModel,
Extra,
)

StrOrPath = Union[Path, str]

Expand All @@ -35,7 +42,18 @@ class RepositoryInstallTargets(BaseModel):
tools: List[RepositoryInstallTarget]


class DataManager(BaseModel, extra=Extra.forbid):
class DictOrValue(BaseModel):
__root__: Union[Dict[str, Union[str, int, float, bool, "DictOrValue"]], Union[str, int, float, bool]]


class Parameters(BaseModel):
parameters: Optional[DictOrValue] = None


DictOrValue.update_forward_refs()


class DataManager(Parameters, extra=Extra.forbid):
tags: List[str]
tool_id: str

Expand All @@ -61,7 +79,7 @@ class Genome(BaseModel):

# Description of actions (data managers) to run on target genome.
indexers: Optional[
List[str]
List[Union[str, Dict[str, Parameters]]]
] # indexers to run - keyed on repository name - see data_managers.yml for how to resolve these to tools
skiplist: Optional[List[str]] = (
None # unimplemented: but if we implement classes of indexers, these will be ones to skip
Expand Down
28 changes: 2 additions & 26 deletions src/ephemeris/_idc_data_managers_to_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
"""
import argparse
import logging
from typing import (
Dict,
List,
NamedTuple,
)

import yaml

Expand All @@ -29,29 +24,10 @@
)


class DataManager(NamedTuple):
tool_id: str
repository_name: str
tags: List[str]


def read_data_managers_configuration(path: str) -> Dict[str, DataManager]:
raw_data_managers = read_data_managers(path)
data_managers: Dict[str, DataManager] = {}
for repository_name, data_manager_configuration in raw_data_managers.__root__.items():
data_manager = DataManager(
tool_id=data_manager_configuration.tool_id,
repository_name=repository_name,
tags=data_manager_configuration.tags or [],
)
data_managers[repository_name] = data_manager
return data_managers


def build_shed_install_conf(path: str) -> dict:
data_managers = read_data_managers_configuration(path)
data_managers = read_data_managers(path)
tools = []
for data_manager in data_managers.values():
for data_manager in data_managers.__root__.values():
tool_id = data_manager.tool_id
tool_id_parts = tool_id.split("/")
repo_owner = tool_id_parts[2]
Expand Down
113 changes: 61 additions & 52 deletions src/ephemeris/_idc_split_data_manager_genomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
by genomes.yml that have already been executed and appear in the target
installed data table configuration.
"""
import json
import logging
import os
import re
Expand All @@ -22,15 +23,24 @@
import requests
import yaml
from galaxy.util import safe_makedirs
from pydantic import (
BaseModel,
Extra,
)

try:
from pydantic.v1 import (
BaseModel,
Extra,
)
except ImportError:
from pydantic import (
BaseModel,
Extra,
)

from . import get_galaxy_connection
from ._idc_data_managers_to_tools import (
from ._config_models import (
DataManager,
read_data_managers_configuration,
DataManagers,
DictOrValue,
read_data_managers,
)
from .common_parser import get_common_args
from .ephemeris_log import (
Expand Down Expand Up @@ -70,8 +80,8 @@ class SplitOptions:
filters: Filters = Filters()


def tool_id_for(indexer: str, data_managers: Dict[str, DataManager], mode: str) -> str:
data_manager = data_managers[indexer]
def tool_id_for(indexer: str, data_managers: DataManagers, mode: str) -> str:
data_manager = data_managers.__root__[indexer]
assert data_manager, f"Could not find a target data manager for indexer name {indexer}"
tool_shed_guid = data_manager.tool_id
if mode == "short":
Expand All @@ -88,19 +98,13 @@ def tool_id_for(indexer: str, data_managers: Dict[str, DataManager], mode: str)
class RunDataManager(BaseModel):
id: str
items: Optional[List[Any]] = None
params: Optional[List[Any]] = None
data_table_reload: Optional[List[str]] = None
params: Optional[DictOrValue] = None


class RunDataManagers(BaseModel):
data_managers: List[RunDataManager]


class DataManager(BaseModel, extra=Extra.forbid):
tags: List[str]
tool_id: str


class DataManagers(BaseModel, extra=Extra.forbid):
__root__: Dict[str, DataManager]

Expand Down Expand Up @@ -149,7 +153,7 @@ def write_run_data_manager_to_file(run_data_manager: RunDataManager, path: str):


def walk_over_incomplete_runs(split_options: SplitOptions):
data_managers = read_data_managers_configuration(split_options.data_managers_path)
data_managers = read_data_managers(split_options.data_managers_path)
with open(split_options.merged_genomes_path) as f:
genomes_all = yaml.safe_load(f)
genomes = genomes_all["genomes"]
Expand All @@ -169,77 +173,82 @@ def walk_over_incomplete_runs(split_options: SplitOptions):
if do_fetch and not split_options.is_build_complete(build_id, fetch_indexer):
log.info(f"Fetching: {build_id}")
fetch_tool_id = tool_id_for(fetch_indexer, data_managers, split_options.tool_id_mode)
fetch_params = []
fetch_params.append({"dbkey_source|dbkey_source_selector": "new"})
fetch_params.append({"dbkey_source|dbkey": genome["id"]})
description = genome.get("description")
fetch_params = {
"dbkey_source": {"dbkey_source_selector": "new", "dbkey": genome["id"]},
"sequence_id": genome["id"],
"sequence_name": description,
}
source = genome.get("source")
if source == "ucsc":
if not description:
description = ucsc_description_for_build(genome["id"])
fetch_params.append({"reference_source|reference_source_selector": "ucsc"})
fetch_params.append({"reference_source|requested_dbkey": genome["id"]})
fetch_params.append({"sequence_name": description})
fetch_params["sequence_name"] = ucsc_description_for_build(genome["id"])
fetch_params["reference_source"] = {
"reference_source_selector": "ucsc",
"requested_dbkey": genome["id"],
}
elif re.match("^[A-Z_]+[0-9.]+", source):
fetch_params.append({"reference_source|reference_source_selector": "ncbi"})
fetch_params.append({"reference_source|requested_identifier": source})
fetch_params.append({"sequence_name": genome["description"]})
fetch_params.append({"sequence.id": genome["id"]})
fetch_params["reference_source"] = {
"reference_source_selector": "ncbi",
"requested_identifier": source,
}
elif re.match("^http", source):
fetch_params.append({"reference_source|reference_source_selector": "url"})
fetch_params.append({"reference_source|user_url": source})
fetch_params.append({"sequence_name": genome["description"]})
fetch_params.append({"sequence.id": genome["id"]})
fetch_params["reference_source"] = {"reference_source_selector": "url", "user_url": source}

if description:
fetch_params.append({"dbkey_source|dbkey_name": description})
fetch_params["dbkey_source"]["dbkey_name"] = description

fetch_run_data_manager = RunDataManager(
id=fetch_tool_id,
params=fetch_params,
# Not needed according to Marius
# data_table_reload=["all_fasta", "__dbkeys__"],
)
yield (build_id, fetch_indexer, fetch_run_data_manager)
else:
log.debug(f"Fetch is already completed: {build_id}")

indexers = genome.get("indexers", [])
for indexer in indexers:
if split_options.filters.filter_out_data_manager(indexer):
if isinstance(indexer, dict):
indexer_name = next(iter(indexer.keys()))
indexer_parameters = indexer[indexer_name]["parameters"]
else:
indexer_name = indexer
indexer_parameters = {}
if split_options.filters.filter_out_data_manager(indexer_name):
continue

if split_options.filters.filter_out_stage(1):
continue

if split_options.is_build_complete(build_id, indexer):
log.debug(f"Build is already completed: {build_id} {indexer}")
if split_options.is_build_complete(build_id, indexer_name):
log.debug(f"Build is already completed: {build_id} {indexer_name}")
continue

log.info(f"Building: {build_id} {indexer}")

tool_id = tool_id_for(indexer, data_managers, split_options.tool_id_mode)
params = [
{"all_fasta_source": "{{ item.id }}"},
{"sequence_name": "{{ item.name }}"},
{"sequence_id": "{{ item.id }}"},
]
# why is this not pulled from the data managers conf? -nate
if re.search("bwa", tool_id):
params.append({"index_algorithm": "bwtsw"})
if re.search("color_space", tool_id):
continue
log.info(f"Building: {build_id} {indexer_name}")

tool_id = tool_id_for(indexer_name, data_managers, split_options.tool_id_mode)
data_manager = data_managers.__root__[indexer_name]
data_manager_parameters = {}
if data_manager.parameters:
data_manager_parameters = json.loads(data_manager.parameters.json()) or {}
data_manager_parameters.update(indexer_parameters)
if not data_manager_parameters:
data_manager_parameters = {
"all_fasta_source": "{{ item.id }}",
"sequence_name": "{{ item.name }}",
"sequence_id": "{{ item.id }}",
}

item = deepcopy(genome)
item.pop("indexers", None)
item.pop("skiplist", None)

run_data_manager = RunDataManager(
id=tool_id,
params=params,
params=data_manager_parameters,
items=[item],
)
yield (build_id, indexer, run_data_manager)
yield (build_id, indexer_name, run_data_manager)


def split_genomes(split_options: SplitOptions) -> None:
Expand Down
22 changes: 15 additions & 7 deletions src/ephemeris/run_data_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from bioblend.galaxy import GalaxyInstance
from bioblend.galaxy.tool_data import ToolDataClient
from bioblend.galaxy.tools import ToolClient
from boltons.iterutils import remap
from jinja2 import Template
from typing_extensions import Literal

Expand Down Expand Up @@ -160,14 +161,21 @@ def get_dm_jobs(self, dm):
def handle_item(item: str):
dm_id = dm["id"]
params = dm["params"]
inputs = dict()
# Iterate over all parameters, replace occurences of {{item}} with the current processing item

# Iterate over all parameters, replace occurrences of {{item}} with the current processing item
# and create the tool_inputs dict for running the data manager job
for param in params:
key, value = list(param.items())[0]
value_template = Template(value)
value = value_template.render(item=item)
inputs.update({key: value})
def template_values(path, key, value):
if isinstance(value, str):
value = Template(value).render(item=item)
return key, value

inputs = remap(params, visit=template_values)

if isinstance(inputs, list):
# "legacy" run data manager input format
# which is a flat list with stringified param keys and values,
# this needs to be turned into a dict
inputs = {k: v for param in inputs for k, v in param.items()}

job = dict(tool_id=dm_id, inputs=inputs)

Expand Down
45 changes: 41 additions & 4 deletions tests/test_split_genomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,36 @@
- genome
"""

DATA_MANAGER_YAML_WITH_PARAMS = """
the_data_manager:
tool_id: toolshed.g2.bx.psu.edu/repos/iuc/the_data_manager/the_data_manager/0.0.1
parameters:
conditional:
param_a: a
param_b: b
tags:
- dm_tag
"""

GENOMES_WITH_PARAMS = """
genomes:
- dbkey: cat
description: fluffy
id: cat
indexers:
- the_data_manager:
parameters:
conditional:
param_c: c
"""


def setup_mock_idc_dir(directory: Path):
def setup_mock_idc_dir(directory: Path, genomes=MERGED_YAML_STR, data_managers=DATA_MANAGER_YAML_STR):
merged = directory / "genomes.yml"
merged.write_text(MERGED_YAML_STR)
merged.write_text(genomes)

data_managers = directory / "data_managers.yml"
data_managers.write_text(DATA_MANAGER_YAML_STR)
data_managers_path = directory / "data_managers.yml"
data_managers_path.write_text(data_managers)


def read_and_validate_run_data_manager_yaml(path):
Expand Down Expand Up @@ -98,6 +121,20 @@ def test_split_genomes(tmp_path: Path):
assert data_manager.items[0]["dbkey"] == "hg19_rCRS_pUC18_phiX174"


def test_split_genomes_with_params(tmp_path):
setup_mock_idc_dir(tmp_path, GENOMES_WITH_PARAMS, DATA_MANAGER_YAML_WITH_PARAMS)
split_path = tmp_path / "split"
split_options = split_options_for(tmp_path)
split_genomes(split_options)
new_task = split_path / "cat" / "the_data_manager"
new_task_run_yaml = new_task / "run_data_managers.yaml"
run = read_and_validate_run_data_manager_yaml(new_task_run_yaml)
assert len(run.data_managers) == 1
data_manager = run.data_managers[0]
# genome config overwrites data manager config
assert data_manager.params.json() == '{"conditional": {"param_c": "c"}}'


def test_split_genomes_short_ids(tmp_path: Path):
setup_mock_idc_dir(tmp_path)
split_path = tmp_path / "split"
Expand Down
Loading