diff --git a/dockerfiles/sv-pipeline-virtual-env/Dockerfile b/dockerfiles/sv-pipeline-virtual-env/Dockerfile index 1a3243c02..6243b6f94 100644 --- a/dockerfiles/sv-pipeline-virtual-env/Dockerfile +++ b/dockerfiles/sv-pipeline-virtual-env/Dockerfile @@ -20,7 +20,7 @@ RUN apt-get -qqy update --fix-missing && \ # install conda packages # NOTE: need to use scipy=1.7.3 instead of scipy=1.8.0 because it makes hail angry ARG CONDA_PKGS="cython=0.29.28 numpy=1.22.3 pandas=1.4.2 scipy=1.7.3 scikit-learn=1.0.2 intervaltree=3.1.0 \ - matplotlib=3.5.1 natsort=8.1.0 google-cloud-dataproc=4.0.2" + matplotlib=3.5.1 natsort=8.1.0 google-cloud-dataproc=4.0.2 seaborn=0.12.2" RUN mamba install -qy --freeze-installed -n $CONDA_ENV_NAME -c conda-forge -c bioconda $CONDA_PKGS # copy in HTSLIB install so that pysam uses same version as is available in pipeline @@ -48,9 +48,9 @@ RUN export SETUPTOOLS_VERSION=$(python -c 'import setuptools; print(setuptools._ pip install setuptools==$SETUPTOOLS_VERSION # pybedtools needs to be installed via pip because it doesn't like the updated python -# hail's latest version is only available via pip or local build +# hail's latest version is only available via pip or local build. Run cache purge in case the base is out of date. ARG PIP_PKGS="pybedtools==0.9.0 hail==0.2.93" -RUN pip3 --no-cache-dir install $PIP_PKGS +RUN pip3 cache purge && pip3 --no-cache-dir install $PIP_PKGS # clean unneeded stuff RUN conda clean -ay --force-pkgs-dirs @@ -79,7 +79,7 @@ RUN export NEW_PACKAGES=$(diff_of_lists.sh "$RUN_DEPS" $APT_REQUIRED_PACKAGES) & # install R packages ARG R_PACKAGES="assertthat beeswarm BH BSDA caret cli crayon DAAG data.table devtools digest dplyr e1071 fansi fpc \ - generics gert glue HardyWeinberg hash latticeExtra magrittr Matrix metap mnormt nlme nloptr nnet \ + generics gert glue HardyWeinberg hash latticeExtra magrittr metap mnormt nlme nloptr nnet \ numDeriv perm pillar pkgconfig plogr plyr purrr pwr R6 RColorBrewer Rcpp reshape reshape2 rlang ROCR \ rpart stringi stringr survival tibble tidyr tidyselect utf8 vioplot withr zoo" ARG BIOCONDUCTOR_PKGS="SNPRelate multtest" @@ -89,6 +89,7 @@ RUN export APT_TRANSIENT_PACKAGES=$(diff_of_lists.sh "$BUILD_DEPS" $APT_REQUIRED apt-get -qqy install --no-install-recommends $BUILD_DEPS $(fix_spaces.sh $APT_REQUIRED_PACKAGES) && \ install_bioconductor_packages.R $BIOCONDUCTOR_PKGS && \ install_deprecated_R_package.sh "https://cran.r-project.org/src/contrib/Archive/MASS/MASS_7.3-58.tar.gz" && \ + install_deprecated_R_package.sh "https://cran.r-project.org/src/contrib/Archive/Matrix/Matrix_1.6-5.tar.gz" && \ install_R_packages.R $R_PACKAGES && \ apt-get -qqy remove --purge $APT_TRANSIENT_PACKAGES && \ apt-get -qqy autoremove --purge && \ @@ -100,18 +101,4 @@ RUN export APT_TRANSIENT_PACKAGES=$(diff_of_lists.sh "$BUILD_DEPS" $APT_REQUIRED /usr/share/man/?? \ /usr/share/man/??_* -# Install plink2 & KING (for relatedness inference) -ARG KING_URL="https://www.kingrelatedness.com/executables/Linux-king215.tar.gz" -RUN mkdir -p /opt/bin && \ - cd /opt/bin && \ - wget -q $KING_URL && \ - tar -xzf Linux-king215.tar.gz && \ - rm -f Linux-king215.tar.gz - -ARG PLINK2_URL="https://github.com/chrchang/plink-ng/releases/download/2019/plink2_linux_x86_64_20190107.zip" -RUN cd /opt/bin && \ - wget -q $PLINK2_URL && \ - unzip plink2_linux_x86_64_20190107.zip && \ - rm -f plink2_linux_x86_64_20190107.zip - ENV PATH=/opt/bin:$PATH diff --git a/inputs/templates/test/FilterGenotypes/FilterGenotypes.optimize_cutoffs.json.tmpl b/inputs/templates/test/FilterGenotypes/FilterGenotypes.optimize_cutoffs.json.tmpl new file mode 100644 index 000000000..b11a12812 --- /dev/null +++ b/inputs/templates/test/FilterGenotypes/FilterGenotypes.optimize_cutoffs.json.tmpl @@ -0,0 +1,38 @@ +{ + "FilterGenotypes.vcf": {{ test_batch.concordance_vcf | tojson }}, + "FilterGenotypes.output_prefix": {{ test_batch.name | tojson }}, + "FilterGenotypes.ploidy_table": {{ test_batch.ploidy_table | tojson }}, + "FilterGenotypes.truth_json": {{ test_batch.recalibrate_gq_truth_json | tojson }}, + + "FilterGenotypes.primary_contigs_fai": {{ reference_resources.primary_contigs_fai | tojson }}, + "FilterGenotypes.gq_recalibrator_model_file": {{ reference_resources.aou_recalibrate_gq_model_file | tojson }}, + "FilterGenotypes.genome_tracks": {{ reference_resources.recalibrate_gq_genome_tracks | tojson }}, + "FilterGenotypes.recalibrate_gq_args": [ + "--keep-homvar false", + "--keep-homref true", + "--keep-multiallelic true", + "--skip-genotype-filtering true", + "--min-samples-to-estimate-allele-frequency -1" + ], + + "FilterGenotypes.ped_file": {{ test_batch.ped_file | tojson }}, + "FilterGenotypes.site_level_comparison_datasets": [ + {{ reference_resources.ccdg_abel_site_level_benchmarking_dataset | tojson }}, + {{ reference_resources.gnomad_v2_collins_site_level_benchmarking_dataset | tojson }}, + {{ reference_resources.hgsv_byrska_bishop_site_level_benchmarking_dataset | tojson }}, + {{ reference_resources.thousand_genomes_site_level_benchmarking_dataset | tojson }} + ], + "FilterGenotypes.sample_level_comparison_datasets": [ + {{ reference_resources.hgsv_byrska_bishop_sample_level_benchmarking_dataset | tojson }} + ], + "FilterGenotypes.sample_renaming_tsv": {{ reference_resources.hgsv_byrska_bishop_sample_renaming_tsv | tojson }}, + "FilterGenotypes.runtime_override_per_sample_benchmark_plot": { + "mem_gb": 30, + "disk_gb": 50 + }, + + "FilterGenotypes.linux_docker": {{ dockers.linux_docker | tojson }}, + "FilterGenotypes.gatk_docker": {{ dockers.gq_recalibrator_docker | tojson }}, + "FilterGenotypes.sv_base_mini_docker": {{ dockers.sv_base_mini_docker | tojson }}, + "FilterGenotypes.sv_pipeline_docker": {{ dockers.sv_pipeline_docker | tojson }} +} diff --git a/inputs/templates/test/GqRecalibrator/MakeGqRecalibratorTrainingSetFromPacBio.json.tmpl b/inputs/templates/test/GqRecalibrator/MakeGqRecalibratorTrainingSetFromPacBio.json.tmpl new file mode 100644 index 000000000..822b73f03 --- /dev/null +++ b/inputs/templates/test/GqRecalibrator/MakeGqRecalibratorTrainingSetFromPacBio.json.tmpl @@ -0,0 +1,19 @@ +{ + "MakeGqRecalibratorTrainingSetFromPacBio.vcfs": [{{ test_batch.concordance_vcf | tojson }}], + "MakeGqRecalibratorTrainingSetFromPacBio.training_sample_ids": {{ test_batch.pacbio_samples_list | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.output_prefix": {{ test_batch.name | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.ploidy_table": {{ test_batch.ploidy_table | tojson }}, + +"MakeGqRecalibratorTrainingSetFromPacBio.pacbio_sample_ids": {{ test_batch.pacbio_samples | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.vapor_files": {{ test_batch.vapor_files | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.pbsv_vcfs": {{ test_batch.pacbio_pbsv_vcfs | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.pav_vcfs": {{ test_batch.pacbio_pav_vcfs | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.sniffles_vcfs": {{ test_batch.pacbio_sniffles_vcfs | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.reference_dict": {{ reference_resources.reference_dict | tojson }}, + + "MakeGqRecalibratorTrainingSetFromPacBio.sv_utils_docker" : {{ dockers.sv_utils_docker | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.gatk_docker" : {{ dockers.gatk_docker | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.sv_base_mini_docker" : {{ dockers.sv_base_mini_docker | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.sv_pipeline_docker" : {{ dockers.sv_pipeline_docker | tojson }}, + "MakeGqRecalibratorTrainingSetFromPacBio.linux_docker" : {{ dockers.linux_docker | tojson }} +} diff --git a/inputs/values/dockers.json b/inputs/values/dockers.json index 4eb8cec68..cc5120df6 100644 --- a/inputs/values/dockers.json +++ b/inputs/values/dockers.json @@ -25,10 +25,10 @@ "pangenie_docker": "us.gcr.io/broad-dsde-methods/vjalili/pangenie:vj-127571f", "sv-base-virtual-env": "us.gcr.io/broad-dsde-methods/vjalili/sv-base-virtual-env:5994670", "cnmops-virtual-env": "us.gcr.io/broad-dsde-methods/vjalili/cnmops-virtual-env:5994670", - "sv-pipeline-virtual-env": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-pipeline-virtual-env:2024-01-24-v0.28.4-beta-9debd6d7", + "sv-pipeline-virtual-env": "us.gcr.io/broad-dsde-methods/markw/sv-pipeline-virtual-env:mw-train-genotype-filtering-a9479501", "samtools-cloud-virtual-env": "us.gcr.io/broad-dsde-methods/vjalili/samtools-cloud-virtual-env:5994670", "sv-utils-env": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-utils-env:2023-02-01-v0.26.8-beta-9b25c72d", - "sv_utils_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-utils:2024-01-24-v0.28.4-beta-9debd6d7", + "sv_utils_docker": "us.gcr.io/broad-dsde-methods/markw/sv-utils:mw-train-genotype-filtering-a9479501", "gq_recalibrator_docker": "us.gcr.io/broad-dsde-methods/markw/gatk:mw-tb-form-sv-filter-training-data-899360a", "str": "us.gcr.io/broad-dsde-methods/gatk-sv/str:2023-05-23-v0.27.3-beta-e537bdd6" } \ No newline at end of file diff --git a/inputs/values/hgdp.json b/inputs/values/hgdp.json index f50fcf2b4..6548fb18d 100644 --- a/inputs/values/hgdp.json +++ b/inputs/values/hgdp.json @@ -32,9 +32,9 @@ "concordance_vcf": "gs://gatk-sv-hgdp/mw-sv-concordance-update/hgdp.concordance.vcf.gz", "concordance_vcf_index": "gs://gatk-sv-hgdp/mw-sv-concordance-update/hgdp.concordance.vcf.gz.tbi", - "pacbio_sample_concordance_vcf": "gs://gatk-sv-hgdp/mw-sv-concordance-update/training/hgdp.concordance.subset.vcf.gz", - "pacbio_sample_concordance_vcf_index": "gs://gatk-sv-hgdp/mw-sv-concordance-update/training/hgdp.concordance.subset.vcf.gz.tbi", - "recalibrate_gq_truth_json": "gs://gatk-sv-hgdp/mw-sv-concordance-update/training/hgdp.gq_training_labels.json", + "pacbio_sample_concordance_vcf": "gs://gatk-sv-hgdp/mw-train-genotype-filtering/hgdp.pacbio_samples.vcf.gz", + "pacbio_sample_concordance_vcf_index": "gs://gatk-sv-hgdp/mw-train-genotype-filtering/hgdp.pacbio_samples.vcf.gz.tbi", + "recalibrate_gq_truth_json": "gs://gatk-sv-hgdp/mw-train-genotype-filtering/hgdp.gq_training_labels.json", "aou_recalibrated_vcf": "gs://gatk-sv-hgdp/mw-sv-concordance-update/hgdp.concordance.aou_gq_recalibrated.vcf.gz", "aou_recalibrated_vcf_index": "gs://gatk-sv-hgdp/mw-sv-concordance-update/hgdp.concordance.aou_gq_recalibrated.vcf.gz.tbi", diff --git a/src/sv-pipeline/scripts/format_pb_for_gatk.py b/src/sv-pipeline/scripts/format_pb_for_gatk.py new file mode 100644 index 000000000..c15d7ef02 --- /dev/null +++ b/src/sv-pipeline/scripts/format_pb_for_gatk.py @@ -0,0 +1,262 @@ +#!/bin/env python + +import argparse +import pysam +import sys +from typing import Any, List, Text, Dict, Optional + +_gt_sum_map = dict() + + +def _parse_ploidy_table(path: Text) -> Dict[Text, Dict[Text, int]]: + """ + Parses tsv of sample ploidy values. + + Parameters + ---------- + path: Text + table path + + Returns + ------- + header: Dict[Text, Dict[Text, int]] + map of sample to contig to ploidy, i.e. Dict[sample][contig] = ploidy + """ + ploidy_dict = dict() + with open(path, 'r') as f: + header = f.readline().strip().split('\t') + for line in f: + tokens = line.strip().split('\t') + sample = tokens[0] + ploidy_dict[sample] = {header[i]: int(tokens[i]) for i in range(1, len(header))} + return ploidy_dict + + +def create_header(header_in: pysam.VariantHeader) -> pysam.VariantHeader: + """ + Ingests the given header, removes specified fields, and adds necessary fields. + + Parameters + ---------- + header_in: pysam.VariantHeader + input header + + Returns + ------- + header: pysam.VariantHeader + gatk-style header + """ + header = pysam.VariantHeader() + for sample in header_in.samples: + header.add_sample(sample) + # new fields + header.add_line('##fileformat=VCFv4.2') + header.add_line('##FORMAT=') + header.add_line('##FORMAT=') + header.add_line('##FORMAT=') + header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + header.add_line('##contig=') + return header + + +def convert(record: pysam.VariantRecord, + vcf_out: pysam.VariantFile, + algorithm: Text, + min_size: int, + ploidy_dict: Dict[Text, Dict[Text, int]]) -> pysam.VariantRecord: + """ + Converts a record from for use in PacBio call comparisons. This includes updating all GT fields with proper + ploidy, adding necessary fields such as ECN and CN, enforcing min size, and updating the ALGORITHMS field. + + Parameters + ---------- + record: pysam.VariantRecord + input record + vcf_out: pysam.VariantFile + new vcf, to which the converted record will be written + algorithm: Text + name of the pacbio caller + min_size: int + Minimum SV size + ploidy_dict: Dict[Text, Dict[Text, int]] + map from sample to contig to ploidy + + Returns + ------- + pysam.VariantRecord + reformatted record + """ + svtype = record.info['SVTYPE'] + if not supported_type(record): + return list() + contig = record.contig + if contig not in vcf_out.header.contigs: + return list() + # Force DUPs to insertions for consistency + isDup = svtype == 'DUP' + if isDup: + svtype = 'INS' + if 'SVLEN' in record.info and record.info['SVLEN'] is not None: + svlen = record.info['SVLEN'] + if isinstance(svlen, tuple): + svlen = svlen[0] + svlen = abs(int(svlen)) + if svtype == 'INS': + end = record.start + 1 + else: + end = record.start + svlen + else: + svlen = record.stop - record.pos + end = record.stop + if svlen < min_size: + return list() + if end <= record.start: + end = record.start + 1 + # Force symbolic allele + alleles = ["N", f"<{svtype}>"] + new_record = vcf_out.new_record(id=record.id, contig=contig, start=record.start, stop=end, alleles=alleles) + new_record.info['ALGORITHMS'] = [algorithm] + new_record.info['SVTYPE'] = svtype + # fix SVLEN, STRANDS, CHR2, and END2 where needed + if svtype == 'INS': + new_record.info['SVLEN'] = svlen + elif svtype == 'INV': + new_record.info['STRANDS'] = '++' + # copy FORMAT fields + for sample in record.samples: + genotype = record.samples[sample] + new_genotype = new_record.samples[sample] + ecn = ploidy_dict[sample][contig] + new_genotype['ECN'] = ecn + gt_sum = _cache_gt_sum(genotype['GT']) + if svtype == 'DEL': + new_genotype['CN'] = max(ecn - gt_sum, 0) + if new_genotype['ECN'] == 0: + new_genotype['GT'] = () + elif ecn == 1: + if gt_sum == 0: + new_genotype['GT'] = (0,) + else: + new_genotype['GT'] = (1,) + else: + if gt_sum == 0: + new_genotype['GT'] = (0, 0) + elif gt_sum == 1: + new_genotype['GT'] = (0, 1) + else: + new_genotype['GT'] = (1, 1) + out = [new_record] + return out + + +def _cache_gt_sum(gt): + s = _gt_sum_map.get(gt, None) + if s is None: + s = sum([1 for a in gt if a is not None and a > 0]) + _gt_sum_map[gt] = s + return s + + +def supported_type(record: pysam.VariantRecord) -> bool: + return record.info['SVTYPE'] in ['DEL', 'DUP', 'INS', 'INV'] + + +def _process(vcf_in: pysam.VariantFile, + vcf_out: pysam.VariantFile, + arguments: Dict[Text, Any]) -> None: + """" + Master function for processing the given input vcf and writing output + + Parameters + ---------- + vcf_in: pysam.VariantFile + input vcf + vcf_out: pysam.VariantFile + output vcf + arguments: Dict[Text, Any] + commandline arguments + """ + ploidy_dict = _parse_ploidy_table(arguments.ploidy_table) + + for record in vcf_in: + out = convert( + record=record, + vcf_out=vcf_out, + algorithm=arguments.algorithm, + min_size=arguments.min_size, + ploidy_dict=ploidy_dict + ) + for record in out: + vcf_out.write(record) + + +def _parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Convert a PacBio-derived SV VCF to SVTK-style", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--vcf", type=str, required=True, + help="Input VCF. Supported callers include: pbsv, pav, sniffles") + parser.add_argument("--out", type=str, required=True, + help="Output VCF") + parser.add_argument("--algorithm", type=str, required=True, + help="Algorithm name") + parser.add_argument("--min-size", type=int, default=25, + help="Min SV size") + parser.add_argument("--ploidy-table", type=str, required=True, + help="Tab-delimited table of sample ploidies. The table should have a header row where the " + "first column is SAMPLE, and the remaining columns are contig names. For each row " + "thereafter, the first column is the sample name, and remaining columns are the contig " + "ploidy values for that sample.") + + if len(argv) <= 1: + parser.parse_args(["--help"]) + sys.exit(0) + parsed_arguments = parser.parse_args(argv[1:]) + return parsed_arguments + + +def main(argv: Optional[List[Text]] = None): + if argv is None: + argv = sys.argv + arguments = _parse_arguments(argv) + + # convert vcf header and records + with pysam.VariantFile(arguments.vcf) as vcf_in: + header = create_header(vcf_in.header) + with pysam.VariantFile(arguments.out, mode='w', header=header) as vcf_out: + _process(vcf_in, vcf_out, arguments) + + +if __name__ == "__main__": + main() diff --git a/src/sv-pipeline/scripts/make_sl_table.py b/src/sv-pipeline/scripts/make_sl_table.py new file mode 100644 index 000000000..08defb1d0 --- /dev/null +++ b/src/sv-pipeline/scripts/make_sl_table.py @@ -0,0 +1,141 @@ +#!/bin/env python + +import argparse +from collections import Counter +import json +import gzip +import pysam +import sys +from typing import List, Text, Optional + +import numpy as np + + +def load_json(path): + with open(path) as f: + training_sites = json.load(f) + return { + s: {'good_variant_ids': set(training_sites[s]['good_variant_ids']), + 'bad_variant_ids': set(training_sites[s]['bad_variant_ids'])} for s in training_sites + } + + +def count_samples(vcf_path): + with pysam.VariantFile(vcf_path) as vcf: + return len(vcf.header.samples) + + +def create_vcf_tsv(out_path, truth_json_path, vcf_path, min_size_medium, + min_size_large, downsample_factor=None, labeled_only=False): + # Info fields to load (double-check these if you get an invalid header error) + info_fields = [ + 'END2', 'CHR2', 'ALGORITHMS', 'EVIDENCE', 'SVLEN', 'SVTYPE', 'AF', 'STATUS', 'NON_REF_GENOTYPE_CONCORDANCE', + 'ALGORITHMS' + ] + + # Format fields to load (double-check these if you get an invalid header error) + format_fields = [ + 'SAMPLE', 'GT', 'EV', 'GQ', 'SL', 'RD_GQ', 'SR_GQ', 'PE_GQ', 'OGQ' + ] + + # Types to load + valid_types = set(['DEL', 'DUP', 'INS', 'INV', 'CPX', 'CTX']) + + # Load truth labels + training_sites = load_json(truth_json_path) + + _gt_non_ref_or_no_call_map = dict() + + def _is_non_ref_or_no_call(gt): + s = _gt_non_ref_or_no_call_map.get(gt, None) + if s is None: + s = any([a is not None and a > 0 for a in gt]) or all([a is None for a in gt]) + _gt_non_ref_or_no_call_map[gt] = s + return s + + def _reformat_field(val, key): + if isinstance(val, tuple): + if key == 'GT': + return "/".join([str(e) for e in val]) + else: + return ",".join([str(e) for e in val]) + else: + return val + + # Parse vcf + type_counter = Counter() + zipped_out_path = out_path if out_path.endswith(".gz") else out_path + ".gz" + with pysam.VariantFile(vcf_path) as vcf, gzip.open(zipped_out_path, 'w') as fout: + # Write column names + fout.write(bytes("\t".join(['CHROM', 'POS', 'END', 'VID', 'FILTER_CLASS', 'LABEL'] + + format_fields + info_fields) + "\n", 'utf-8')) + samples = list(set(vcf.header.samples).intersection(set(training_sites.keys()))) + i = 0 + for r in vcf: + i += 1 + if downsample_factor is not None and i % downsample_factor > 0: + continue + svtype = r.info['SVTYPE'] + if svtype not in valid_types: + continue + svlen = r.info.get('SVLEN', None) + if svlen is None: + svlen = np.nan + r_class = svtype + if svtype in ['DEL', 'DUP']: + r_class += '_' + ('s' if svlen < min_size_medium else 'm' if svlen < min_size_large else 'l') + r_data = [[r.chrom, r.pos, r.stop, r.id, r_class, 1 if r.id in training_sites[s]['good_variant_ids'] + else 0 if r.id in training_sites[s]['bad_variant_ids'] else -1] + + [s if k == 'SAMPLE' else _reformat_field(r.samples[s].get(k, None), k) for k in format_fields] + + [_reformat_field(r.info.get(k, None), k) for k in info_fields] + for s in samples if _is_non_ref_or_no_call(r.samples[s]['GT']) and + ((not labeled_only) or r.id in training_sites[s]['good_variant_ids'] or + r.id in training_sites[s]['bad_variant_ids'])] + type_counter[r_class] += len(r_data) + if i % 10000 == 0: + print(f"Processed {i} records; position {r.chrom}:{r.pos}") + print("\t" + ", ".join(sorted([f"{key}: {val}" for key, val in type_counter.items()]))) + if len(r_data) > 0: + fout.write(bytes("\n".join(["\t".join([str(x) for x in entry]) for entry in r_data]) + "\n", 'utf-8')) + + +def _parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Writes VCF fields to tsv for consumption by optimize_sl_cutoffs.py", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument('--vcf', type=str, help='Input vcf with SL annotations') + parser.add_argument('--truth-json', type=str, help='Input truth json') + parser.add_argument('--out', type=str, help='Output table path, will be gzipped (.gz)') + parser.add_argument("--downsample-factor", type=int, + help="If provided, load only every DOWNSAMPLE_FACTOR variants from the vcf") + parser.add_argument("--labeled-only", action='store_true', + help="Limit output to labeled sites; intended for very large call sets with " + "sparsely labeled genotypes.") + parser.add_argument("--medium-size", type=float, default=500, + help="Min size for medium DEL/DUP") + parser.add_argument("--large-size", type=float, default=10000, + help="Min size for large DEL/DUP") + if len(argv) <= 1: + parser.parse_args(["--help"]) + sys.exit(0) + parsed_arguments = parser.parse_args(argv[1:]) + return parsed_arguments + + +def main(argv: Optional[List[Text]] = None): + if argv is None: + argv = sys.argv + args = _parse_arguments(argv) + + # Write out genotypes table to disk + print("Reading VCF and writing to data table...") + create_vcf_tsv(out_path=args.out, truth_json_path=args.truth_json, vcf_path=args.vcf, + min_size_medium=args.medium_size, min_size_large=args.large_size, + downsample_factor=args.downsample_factor, labeled_only=args.labeled_only) + print("Done!") + + +if __name__ == "__main__": + main() diff --git a/src/sv-pipeline/scripts/optimize_sl_cutoffs.py b/src/sv-pipeline/scripts/optimize_sl_cutoffs.py new file mode 100644 index 000000000..54b5b1481 --- /dev/null +++ b/src/sv-pipeline/scripts/optimize_sl_cutoffs.py @@ -0,0 +1,288 @@ +#!/bin/env python + +import argparse +import os +import sys +from typing import List, Text, Optional + +from matplotlib import pyplot as plt +import matplotlib.ticker as mticker +import numpy as np +import pandas as pd +import seaborn as sns +from sklearn import metrics + + +def load_df(path): + df = pd.read_csv(path, sep='\t', compression='gzip') + df['LABEL'] = df['LABEL'].astype(int) + for k in ['SVLEN', 'NON_REF_GENOTYPE_CONCORDANCE', 'GQ', 'OGQ', 'RD_GQ', 'PE_GQ', 'SR_GQ', 'AF']: + if k not in df.columns: + continue + df.loc[df[k] == 'None', k] = 'nan' + df[k] = df[k].astype(float) + df['VID_SAMPLE'] = df['VID'] + "_" + df['SAMPLE'] + df = df.set_index('VID_SAMPLE') + return df + + +def plot_hist(data, x, out_dir, out_name, hue=None, row=None, col=None, bins=20, stat='count'): + sns.set_palette("muted") + g = sns.displot( + data, x=x, + hue=hue, + row=row, + col=col, + stat=stat, + multiple="stack", + edgecolor=".3", + linewidth=.5, + height=3, aspect=1.7, + bins=bins + ) + if str(data[x].dtype) == 'object': + rotation = 90 + for i, ax in enumerate(g.fig.axes): + ticks_loc = ax.get_xticks() + ax.xaxis.set_major_locator(mticker.FixedLocator(ticks_loc)) + ax.set_xticklabels([x for x in ax.get_xticklabels()], rotation=rotation) + plt.savefig(fname=os.path.join(out_dir, f"{out_name}.histogram.{x}.png"), format="png", + dpi=300, bbox_inches="tight") + + +# General QC: +# Check that there are sufficient labeled data across SV classes and sizes +# and that the SL scores are sensible (more negative for FP, more positive for TP) +def generate_qc_plots(df, out_dir, out_name): + + # Copy and perform any modifications to the dataset here + df['LOG_SVLEN'] = np.log10(df['SVLEN']) + df['TRUTH_LABEL'] = 'UNLABELED' + df.loc[df['LABEL'] == 0, 'TRUTH_LABEL'] = 'FP' + df.loc[df['LABEL'] == 1, 'TRUTH_LABEL'] = 'TP' + df['TRUTH_LABEL'] = pd.Categorical(df['TRUTH_LABEL'], ['TP', 'FP', 'UNLABELED']) + + n_total = df.shape[0] + n_true = df[df['LABEL'] == 1].shape[0] + n_false = df[df['LABEL'] == 0].shape[0] + n_unl = df[df['LABEL'] == -1].shape[0] + print(f"True: {n_true} ({100*n_true/float(n_total)}%)") + print(f"False: {n_false} ({100*n_false/float(n_total)}%)") + print(f"Unlabeled: {n_unl} ({100*n_unl/float(n_total)}%)") + print(f"Total: {n_total}") + + # Plot + plot_hist(df, 'NON_REF_GENOTYPE_CONCORDANCE', hue='TRUTH_LABEL', bins=30, out_dir=out_dir, out_name=out_name) + plot_hist(df, 'TRUTH_LABEL', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'LOG_SVLEN', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'SVTYPE', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'FILTER_CLASS', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'AF', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'SL', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'STATUS', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'NON_REF_GENOTYPE_CONCORDANCE', hue='TRUTH_LABEL', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'ALGORITHMS', hue='TRUTH_LABEL', col='SVTYPE', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'OGQ', hue='TRUTH_LABEL', col='SVTYPE', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'RD_GQ', hue='TRUTH_LABEL', col='SVTYPE', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'PE_GQ', hue='TRUTH_LABEL', col='SVTYPE', out_dir=out_dir, out_name=out_name) + plot_hist(df, 'SR_GQ', hue='TRUTH_LABEL', col='SVTYPE', out_dir=out_dir, out_name=out_name) + + +# Plots precision / recall curves and performs cutoff optimization based on max F-score +def plot_precision_recall(data, name, out_dir, out_name, reverse=False, beta=1, plot=True, n_samples=None): + results = dict() + keys = [ + 'ALL', + 'DEL_s', + 'DEL_m', + 'DEL_l', + 'DUP_s', + 'DUP_m', + 'DUP_l', + 'INS', + 'INV', + 'CNV', + 'BND', + 'CTX', + 'CPX'] + for key in keys: + if key == 'ALL': + df2 = data[data['FILTER_CLASS'] != 'BND'] + else: + df2 = data[(data['FILTER_CLASS'] == key)] + y_scores = df2[name] + not_nan = ~y_scores.isnull() + y_scores = y_scores[not_nan] + results[key] = dict() + results[key]['ppv'] = [] + results[key]['tpr'] = [] + results[key]['thresholds'] = [] + if len(y_scores) <= 1: + continue + if reverse: + y_scores = -y_scores + y_labels = df2.loc[not_nan, 'LABEL'] + ppv_i, tpr_i, thresholds_i = metrics.precision_recall_curve(y_labels, y_scores, pos_label=1) + + pos_counts, pos_bins = np.histogram(y_scores, bins=100) + if reverse: + pos_counts = np.cumsum(pos_counts) + else: + pos_counts = np.sum(pos_counts) - np.cumsum(pos_counts) + if n_samples is not None: + pos_counts = pos_counts / n_samples + + if len(thresholds_i) <= 1: + continue + fbeta = (1 + beta * beta) * tpr_i * ppv_i / (tpr_i + (beta * beta * ppv_i)) + fmax = np.nanmax(fbeta) + fmax_index = np.nanargmax(fbeta) + fmax_threshold = thresholds_i[fmax_index] + fmax_tpr = tpr_i[fmax_index] + fmax_ppv = ppv_i[fmax_index] + fmax_pos = np.interp(fmax_threshold, pos_bins[:-1], pos_counts) + + n = y_labels.shape[0] + if reverse: + fmax_threshold = -fmax_threshold + results[key]['fmax'] = fmax + results[key]['fmax_thresh'] = fmax_threshold + results[key]['fmax_rec'] = fmax_tpr + results[key]['fmax_prec'] = fmax_ppv + results[key]['fmax_pos'] = fmax_pos + + results[key]['ppv'] = ppv_i + results[key]['tpr'] = tpr_i + results[key]['thresh'] = thresholds_i + results[key]['pos_counts'] = pos_counts + results[key]['pos_bins'] = pos_bins + results[key]['n'] = n + + if not plot: + return + keys = [k for k in results.keys() if len(results[k]['tpr']) > 0] + plt.figure(figsize=(16, 3)) + for k in keys: + r = results[k] + plt.subplot(1, 4, 1) + plt.step(r['tpr'], r['ppv']) + plt.xlabel('recall') + plt.ylabel('precision') + plt.legend(keys) + t = np.asarray(r['thresh']) + t_pos = np.asarray(r['pos_bins']) + if reverse: + t = -t + t_pos = -t_pos + plt.subplot(1, 4, 2) + plt.step(t, r['tpr'][:-1], where='pre') + plt.xlabel(name) + plt.ylabel('recall') + plt.subplot(1, 4, 3) + plt.step(t, r['ppv'][:-1], where='pre') + plt.xlabel(name) + plt.ylabel('precision') + + plt.subplot(1, 4, 4) + plt.step(t_pos[:-1], r['pos_counts'], where='pre') + plt.xlabel(name) + if n_samples is None: + plt.ylabel('count') + else: + plt.ylabel('n_per_genome') + + plt.savefig(fname=os.path.join(out_dir, f"{out_name}.precision_recall.png"), format="png", + dpi=300, bbox_inches="tight") + return results + + +def write_stats(out_path, stats, stat_name): + with open(out_path, 'w') as f: + f.write("\t".join(['class', 'n', 'f_max', 'rec', 'prec', 'count', stat_name + '_thresh']) + "\n") + for key in stats: + if 'n' not in stats[key]: + continue + f.write("\t".join([key, str(stats[key]['n'])] + + ["{:.3f}".format(x) for x in [stats[key]['fmax'], stats[key]['fmax_rec'], + stats[key]['fmax_prec']]] + + [f"{stats[key]['fmax_pos']:.0f}", f"{stats[key]['fmax_thresh']:.0f}"]) + "\n") + total = sum([stats[key]['fmax_pos'] for key in stats if 'fmax_pos' in stats[key] and key != 'ALL']) + f.write(f"Total count across subclasses: {total:0.0f}\n") + + +# The next step is to filter genotypes using the FilterRecalibratedVcf workflow +# Use fmax thresholds to set the SL cutoffs +def write_sl_filter_args(out_path, stats): + key_to_arg_map = { + 'DEL_s': 'small-del-threshold', + 'DEL_m': 'medium-del-threshold', + 'DEL_l': 'large-del-threshold', + 'DUP_s': 'small-dup-threshold', + 'DUP_m': 'medium-dup-threshold', + 'DUP_l': 'large-dup-threshold', + 'INS': 'ins-threshold', + 'INV': 'inv-threshold', + 'BND': 'bnd-threshold', + 'CPX': 'cpx-threshold', + 'CTX': 'ctx-threshold' + } + suggested_filter_args = [] + for key in stats: + if key == 'ALL' or 'fmax_thresh' not in stats[key]: + continue + suggested_filter_args.append(f"--{key_to_arg_map[key]} {stats[key]['fmax_thresh']}") + with open(out_path, 'w') as f: + f.write(" ".join(suggested_filter_args) + "\n") + + +def _parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Optimizes cutoffs for SL filtering", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument('--table', type=str, help='Gzipped input record table with SL annotations') + parser.add_argument('--out-dir', type=str, help='Output directory', default="./") + parser.add_argument('--out-name', type=str, help='Output filename base', default="filter_qc") + parser.add_argument("--beta", type=float, default=1.0, + help="Beta parameter for F score (higher values will weight recall more over precision, " + "see https://en.wikipedia.org/wiki/F-score)") + if len(argv) <= 1: + parser.parse_args(["--help"]) + sys.exit(0) + parsed_arguments = parser.parse_args(argv[1:]) + return parsed_arguments + + +def main(argv: Optional[List[Text]] = None): + if argv is None: + argv = sys.argv + args = _parse_arguments(argv) + + # Read table into memory + print("Loading data table...") + df = load_df(args.table) + + # QC plotting + print("Creating QC plots...") + generate_qc_plots(df=df, out_dir=args.out_dir, out_name=args.out_name) + + # Perform filtering optimizations on SL score + # Use plots and/or fmax to choose SL thresholds for each class + + # Remove unlabeled calls + plot_df = df[df['LABEL'] != -1] + + # Calculate fmax and produce precision-recall plots across SL cutoffs + stat_name = "SL" + print("Optimizing cutoffs...") + stats = plot_precision_recall(plot_df, stat_name, out_dir=args.out_dir, out_name=args.out_name, beta=args.beta) + print("Writing stats file...") + write_stats(out_path=os.path.join(args.out_dir, f"{args.out_name}.stats.txt"), stats=stats, stat_name=stat_name) + print("Writing arguments file...") + write_sl_filter_args(out_path=os.path.join(args.out_dir, f"{args.out_name}.filter_args.txt"), stats=stats) + print("Done!") + + +if __name__ == "__main__": + main() diff --git a/src/sv-pipeline/scripts/preprocess_gatk_for_pacbio_eval.py b/src/sv-pipeline/scripts/preprocess_gatk_for_pacbio_eval.py new file mode 100644 index 000000000..e7e6d16b4 --- /dev/null +++ b/src/sv-pipeline/scripts/preprocess_gatk_for_pacbio_eval.py @@ -0,0 +1,56 @@ +#!/bin/env python + +import argparse +import pysam +import sys +from typing import List, Text, Optional + + +def _parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Prepare call set for evaluation against PacBio variants, including subset to DEL/DUP/INS, " + "converting DUP to INS, and filtering variants over 5kbp", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("vcf", type=str, + help="Input VCF. Usually this will be the cleaned vcf.") + + if len(argv) <= 1: + parser.parse_args(["--help"]) + sys.exit(0) + parsed_arguments = parser.parse_args(argv[1:]) + return parsed_arguments + + +def process(vcf: pysam.VariantFile) -> None: + sys.stdout.write(str(vcf.header)) + allowed_svtypes = set(['DEL', 'DUP', 'INS', 'INV']) + for record in vcf: + svtype = record.info['SVTYPE'] + if svtype not in allowed_svtypes: + continue + svlen = record.info.get('SVLEN', record.stop - record.pos) + if svlen > 5000: + continue + if svtype != 'DUP': + sys.stdout.write(str(record)) + else: + record.info['SVLEN'] = record.stop - record.pos + record.info['SVTYPE'] = 'INS' + record.stop = record.pos + 1 + sys.stdout.write(str(record).replace("", "")) + + +def main(argv: Optional[List[Text]] = None): + if argv is None: + argv = sys.argv + arguments = _parse_arguments(argv) + + # convert vcf header and records + with pysam.VariantFile(arguments.vcf) as vcf: + process(vcf) + + +if __name__ == "__main__": + main() diff --git a/src/sv-pipeline/scripts/refine_training_set.py b/src/sv-pipeline/scripts/refine_training_set.py new file mode 100644 index 000000000..42d5a7e54 --- /dev/null +++ b/src/sv-pipeline/scripts/refine_training_set.py @@ -0,0 +1,194 @@ +#!/bin/env python + +from collections import defaultdict +import argparse +import json +import pysam +import sys +from typing import List, Text, Set, Dict, Optional, Tuple + +""" +Creates a GQRecalibrator training sites json file with SV calls from PacBio data. + +Looks for calls clustered together in the input vcfs and comes up with consensus positive/negative labels +for small/med dup/del/ins calls. Note large del/dup/ins and all inv from the input json are passed through +automatically. Other SV types are not included. +""" + +GOOD_VARIANTS_KEY = 'good_variant_ids' +BAD_VARIANTS_KEY = 'bad_variant_ids' +VAPOR_LABEL = 'vapor' + + +def parse_vapor_labels(json_path: Text, + sample_id: Text) -> Dict: + # Load good/bad training site lists for the sample + true_labels = set() + false_labels = set() + with open(json_path) as f: + training_json = json.load(f) + if sample_id not in training_json: + raise ValueError(f"Could not find sample {sample_id} in training json {json_path}") + for vid in training_json[sample_id][GOOD_VARIANTS_KEY]: + true_labels.add(vid) + for vid in training_json[sample_id][BAD_VARIANTS_KEY]: + false_labels.add(vid) + return true_labels, false_labels + + +def get_vids_and_bypass(vcf: pysam.VariantFile, + cnv_size_cutoff: int, + non_cnv_size_cutoff: int) -> Tuple[Set[Text], Set[Text]]: + vids = set() + large_cnv_vids = set() + bypass_vids = set() + cnv_types = ['DEL', 'DUP'] + for record in vcf: + vids.add(record.id) + svtype = record.info['SVTYPE'] + svlen = record.info['SVLEN'] if 'SVLEN' in record.info else record.stop - record.pos + if svtype in cnv_types and svlen >= cnv_size_cutoff: + large_cnv_vids.add(record.id) + if svtype in ['INS', 'INV'] and svlen >= non_cnv_size_cutoff: + bypass_vids.add(record.id) + return vids, large_cnv_vids, bypass_vids + + +def parse_truth_support(labels: Dict[Text, List[Text]], + vcf: pysam.VariantFile, + main_vids: Set[Text], + truth_alg: Text) -> Dict: + for record in vcf: + if record.id in main_vids and record.info.get("STATUS", "") == "TP": + labels[record.id].append(truth_alg) + + +def refine_labels(vids: Set[Text], + sample_id: Text, + large_cnv_vids: Set[Text], + bypass_vids: Set[Text], + vapor_true: Set[Text], + vapor_false: Set[Text], + strict_labels: Dict[Text, List[Text]], + loose_labels: Dict[Text, List[Text]], + strict_support_threshold: int, + loose_support_threshold: int) -> Dict: + labels = {GOOD_VARIANTS_KEY: list(), BAD_VARIANTS_KEY: list()} + for vid in vids: + in_vapor_true = vid in vapor_true + in_vapor_false = vid in vapor_false + if vid in large_cnv_vids: + # Calls where we are going to use array-based label from the vapor file + if in_vapor_true: + labels[GOOD_VARIANTS_KEY].append(vid) + elif in_vapor_false: + labels[BAD_VARIANTS_KEY].append(vid) + elif (vid in bypass_vids) or not (in_vapor_true or in_vapor_false): + # Vapor no-calls and unlabelable variants (large INV/INS) + continue + else: + vapor_score = 1 if in_vapor_true else 0 + support_strict = len(strict_labels[vid]) + vapor_score + support_loose = len(loose_labels[vid]) + vapor_score + if support_strict >= strict_support_threshold: + labels[GOOD_VARIANTS_KEY].append(vid) + elif support_loose <= loose_support_threshold: + labels[BAD_VARIANTS_KEY].append(vid) + return {sample_id: labels} + + +def _parse_arg_list(arg: Text) -> List[Text]: + if arg is None: + return set() + else: + return arg.split(',') + + +def _parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Refine truth set labels using a VCF containing clustered variants.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--loose-concordance-vcfs", nargs='+', default=[], required=True, + help="Space-separated list of vcfs, this will be the cleaned vcf evaluated with " + "SVConcordance using loose concordance parameters against each tool") + parser.add_argument("--strict-concordance-vcfs", nargs='+', default=[], required=True, + help="Space-separated list of vcfs, this will be the cleaned vcf evaluated with " + " SVConcordance using strict concordance parameters against each tool") + parser.add_argument("--main-vcf", type=str, required=True, help="Usually this will be the cleaned vcf") + parser.add_argument("--vapor-json", type=str, required=True, help="Vapor truth set json") + parser.add_argument("--json-out", type=str, required=True, help="Output json path") + parser.add_argument("--table-out", type=str, required=True, help="Output tsv path") + parser.add_argument("--sample-id", type=str, required=True, help="Sample id") + parser.add_argument("--min-strict-algorithm-count", type=int, default=1, + help="Minimum number of strictly concordant truth algorithms required for call to be true") + parser.add_argument("--max-loose-algorithm-count", type=int, default=0, + help="Maximum number of loosely concordant truth algorithms allowed for call to be false") + parser.add_argument("--cnv-size-cutoff", type=int, default=5000, + help="Retain DEL and DUP variants in the input truth json that are above this size") + parser.add_argument("--non-cnv-size-cutoff", type=int, default=5000, + help="Retain INS/INV variants in the input truth json that are above this size") + parser.add_argument("--truth-algorithms", type=str, default="pbsv,sniffles,pav", + help="Comma-delimited list of truth ALGORITHMS values") + + if len(argv) <= 1: + parser.parse_args(["--help"]) + sys.exit(0) + parsed_arguments = parser.parse_args(argv[1:]) + if len(parsed_arguments.loose_concordance_vcfs) != len(parsed_arguments.strict_concordance_vcfs): + raise ValueError("Must have same number of strict concordance vcfs as loose concordance vcfs") + return parsed_arguments + + +def main(argv: Optional[List[Text]] = None): + if argv is None: + argv = sys.argv + arguments = _parse_arguments(argv) + + # Training json labels + vapor_true, vapor_false = parse_vapor_labels(json_path=arguments.vapor_json, sample_id=arguments.sample_id) + + # Get vids we're looking for + with pysam.VariantFile(arguments.main_vcf) as vcf: + main_vids, large_cnv_vids, bypass_vids = \ + get_vids_and_bypass(vcf, cnv_size_cutoff=arguments.cnv_size_cutoff, + non_cnv_size_cutoff=arguments.non_cnv_size_cutoff) + + # Parse clustered vcfs and generate labels + truth_algs = _parse_arg_list(arguments.truth_algorithms) + strict_labels = defaultdict(list) + if len(arguments.loose_concordance_vcfs) != len(truth_algs): + raise ValueError("Must enter same number of strict and loose concordance vcfs as truth algorithms") + for i in range(len(arguments.strict_concordance_vcfs)): + with pysam.VariantFile(arguments.strict_concordance_vcfs[i]) as vcf: + parse_truth_support(labels=strict_labels, vcf=vcf, main_vids=main_vids, truth_alg=truth_algs[i]) + loose_labels = defaultdict(list) + for i in range(len(arguments.loose_concordance_vcfs)): + with pysam.VariantFile(arguments.loose_concordance_vcfs[i]) as vcf: + parse_truth_support(labels=loose_labels, vcf=vcf, main_vids=main_vids, truth_alg=truth_algs[i]) + + # Get consensus + refined_labels = refine_labels(vids=main_vids, sample_id=arguments.sample_id, large_cnv_vids=large_cnv_vids, + bypass_vids=bypass_vids, + vapor_true=vapor_true, vapor_false=vapor_false, + strict_labels=strict_labels, loose_labels=loose_labels, + strict_support_threshold=arguments.min_strict_algorithm_count, + loose_support_threshold=arguments.max_loose_algorithm_count) + with open(arguments.json_out, 'w') as f: + f.write(json.dumps(refined_labels)) + refined_label_sets = {key: set(val) for key, val in refined_labels[arguments.sample_id].items()} + with open(arguments.table_out, 'w') as f: + f.write("vid\tsample\tlabel\tvapor\t" + "\t".join([f"{a}_strict" for a in truth_algs]) + + "\t" + "\t".join([f"{a}_loose" for a in truth_algs]) + "\n") + for vid in main_vids: + label_col = "1" if vid in refined_label_sets[GOOD_VARIANTS_KEY] \ + else "0" if vid in refined_label_sets[BAD_VARIANTS_KEY] else "NA" + vapor_col = "1" if vid in vapor_true else "0" if vid in vapor_false else "NA" + strict_cols = "\t".join(["1" if alg in strict_labels[vid] else "0" for alg in truth_algs]) + loose_cols = "\t".join(["1" if alg in loose_labels[vid] else "0" for alg in truth_algs]) + f.write(f"{vid}\t{arguments.sample_id}\t{label_col}\t{vapor_col}\t{strict_cols}\t{loose_cols}\n") + + +if __name__ == "__main__": + main() diff --git a/src/sv_utils/src/sv_utils/benchmark_variant_filter.py b/src/sv_utils/src/sv_utils/benchmark_variant_filter.py index c853df3a1..03d511434 100644 --- a/src/sv_utils/src/sv_utils/benchmark_variant_filter.py +++ b/src/sv_utils/src/sv_utils/benchmark_variant_filter.py @@ -1103,7 +1103,7 @@ def _draw_extra_legend_axis( ax.set_yticks([]) bbox = ax.get_tightbbox(renderer=fig.canvas.get_renderer()) approximate_ax_width_chars = bbox.width / legend_font_size - wrap_width = approximate_ax_width_chars + wrap_width = int(approximate_ax_width_chars) for __ in range(advance_color_cycle): # unlabeled lines to advance color cycle ax.plot([numpy.nan, numpy.nan], [numpy.nan, numpy.nan]) diff --git a/src/sv_utils/src/sv_utils/get_confident_variant_lists_from_vapor_and_irs_test.py b/src/sv_utils/src/sv_utils/get_confident_variant_lists_from_vapor_and_irs_test.py new file mode 100755 index 000000000..ce6ec5b8f --- /dev/null +++ b/src/sv_utils/src/sv_utils/get_confident_variant_lists_from_vapor_and_irs_test.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python + +import sys +import argparse +import logging + +import pandas as pd + +from pysam import VariantFile, VariantRecord + +from sv_utils import genomics_io, get_truth_overlap +from typing import List, Text, Optional, Dict, Tuple, Iterable, Set, Mapping + + +def get_confident_variants_vapor(vapor_files: Optional[Dict[str, str]], + precision: float, + valid_variant_ids: set, + strategy: str, + read_strategy_good_support_threshold: int, + read_strategy_bad_support_threshold: int, + read_strategy_bad_cov_threshold: int + ) -> get_truth_overlap.ConfidentVariants: + vapor_info = { + sample_id: + get_truth_overlap.select_confident_vapor_variants(vapor_file=vapor_file, + valid_variant_ids=valid_variant_ids, + precision=precision, + strategy=strategy, + read_strategy_good_support_threshold=read_strategy_good_support_threshold, + read_strategy_bad_support_threshold=read_strategy_bad_support_threshold, + read_strategy_bad_cov_threshold=read_strategy_bad_cov_threshold) + for sample_id, vapor_file in vapor_files.items() + } + return vapor_info + + +# non veriant genotypes (taken from svtk.utils) +NULL_GT = [(0, 0), (None, None), (0, ), (None, )] + + +def get_called_samples(record: VariantRecord) -> set: + samples = set() + for sample in record.samples.keys(): + if record.samples[sample]['GT'] not in NULL_GT: + samples.add(sample) + return samples + + +def get_irs_sample_confident_variants(vcf: str, + valid_irs_variant_ids: set, + samples_list_to_report_mapping: Mapping[Set[str], Tuple[Set[str], Tuple[Set[str]]]]) \ + -> get_truth_overlap.ConfidentVariants: + irs_confident_variants = {} + matched_record_ids = 0 + matched_record_ids_good = 0 + matched_record_ids_bad = 0 + matched_samples_good = 0 + matched_samples_bad = 0 + total_calls = 0 + with VariantFile(vcf) as vcf: + for record in vcf: + if record.id in valid_irs_variant_ids: + matched_record_ids += 1 + called_samples = get_called_samples(record) + total_calls += len(called_samples) + for sample_list in samples_list_to_report_mapping: + if record.id in samples_list_to_report_mapping[sample_list][0]: + matched_record_ids_good += 1 + for sample in called_samples: + if sample in sample_list: + matched_samples_good += 1 + if sample not in irs_confident_variants: + irs_confident_variants[sample] = \ + get_truth_overlap.SampleConfidentVariants(good_variant_ids={record.id}, + bad_variant_ids=set()) + else: + new_good_ids = set(irs_confident_variants[sample].__dict__['good_variant_ids']) + new_good_ids.add(record.id) + irs_confident_variants[sample] = \ + get_truth_overlap.SampleConfidentVariants( + good_variant_ids=new_good_ids, + bad_variant_ids=set(irs_confident_variants[sample].__dict__['bad_variant_ids'])) + if record.id in samples_list_to_report_mapping[sample_list][1]: + matched_record_ids_bad += 1 + for sample in called_samples: + if sample in sample_list: + matched_samples_bad += 1 + if sample not in irs_confident_variants: + irs_confident_variants[sample] = \ + get_truth_overlap.SampleConfidentVariants(good_variant_ids=set(), + bad_variant_ids={record.id}) + else: + new_bad_ids = set(irs_confident_variants[sample].__dict__['bad_variant_ids']) + new_bad_ids.add(record.id) + irs_confident_variants[sample] = \ + get_truth_overlap.SampleConfidentVariants( + good_variant_ids=set(irs_confident_variants[sample].__dict__['good_variant_ids']), + bad_variant_ids=new_bad_ids) + logging.info(f"Valid vcf IRS record ids: {matched_record_ids}") + logging.info(f"Total calls in valid IRS variants: {total_calls}") + logging.info(f"Number times a good variant was matched in an IRS batch: {matched_record_ids_good}") + logging.info(f"Number times a bad variant was matched in an IRS batch: {matched_record_ids_bad}") + logging.info(f"Matched good IRS variant sample calls: {matched_samples_good}") + logging.info(f"Matched bad IRS variant sample calls: {matched_samples_bad}") + return irs_confident_variants + + +def get_confident_variant_ids_from_irs_report(irs_test_report: str, + irs_good_pvalue_threshold: float, + irs_bad_pvalue_threshold: float, + min_probes: int, + valid_irs_variant_ids: set) -> Tuple[Set[str], Set[str]]: + irs_results = genomics_io.tsv_to_pandas(data_file=irs_test_report, require_header=True, header_start='') + irs_results.set_index("ID", inplace=True) + irs_good_variant_ids = valid_irs_variant_ids.intersection( + irs_results.loc[(irs_results["NPROBES"] >= min_probes) & + (pd.notna(irs_results["PVALUE"])) & + (irs_results["PVALUE"] <= irs_good_pvalue_threshold)].index + ) + irs_bad_variant_ids = valid_irs_variant_ids.intersection( + irs_results.loc[(irs_results["NPROBES"] >= min_probes) & + (pd.notna(irs_results["PVALUE"])) & + (irs_results["PVALUE"] >= irs_bad_pvalue_threshold)].index + ) + + return irs_good_variant_ids, irs_bad_variant_ids + + +def read_list_file(path: str) -> Iterable[str]: + with open(path, 'r') as f: + items = [line for line in f.read().splitlines() if line] + if len(items) == 0: + raise ValueError("list empty") + return items + + +def __parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Get lists of confident variant calls per sample given VaPoR and Array IRS Test inputs", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + prog=argv[0] + ) + parser.add_argument("--vcf", "-v", type=str, + help="GATK-SV VCF file.", required=True) + parser.add_argument("--vapor-json", "-j", type=str, + help="Json file with mapping from sample ID to corresponding VaPoR file.", required=True) + parser.add_argument("--vapor-min-precision", type=float, default=get_truth_overlap.Default.min_vapor_precision, + help="Minimum allowed precision for selecting good or bad variants from VaPoR") + parser.add_argument("--vapor-strategy", type=str, default="READS", + help="GQ (just based on VaPoR_GQ), GT (based on re-genotyped model), or READS (based on read support threshold)") + parser.add_argument("--vapor-read-support-pos-thresh", type=int, default=2, + help="Min Number of supporting vapor reads required for positive example") + parser.add_argument("--vapor-read-support-neg-thresh", type=int, default=0, + help="Max number of supporting vapor reads required for neg example") + parser.add_argument("--vapor-read-support-neg-cov-thresh", type=int, default=5, + help="Max number of covering vapor reads required for neg example") + parser.add_argument("--irs-output", type=str, default="-", + help="File to output IRS results to. If omitted or set to '-', print to stdout") + parser.add_argument("--vapor-output", type=str, default="-", + help="File to output Vapor results to. If omitted or set to '-', print to stdout") + parser.add_argument("--vapor-max-cnv-size", type=int, default=5000, + help="Maximum size CNV to trust vapor results for") + parser.add_argument("--irs-sample-batch-lists", type=str, + help="list of lists of samples used in each IRS test batch") + parser.add_argument("--irs-contigs-file", type=str, + help="list of contigs to restrict IRS variants to") + parser.add_argument("--irs-test-report-list", type=str, + help="list of IRS results files") + parser.add_argument("--irs-good-pvalue-threshold", type=float, default=0.000001, + help="Maximum pvalue to choose a good record from the IRS report") + parser.add_argument("--irs-bad-pvalue-threshold", type=float, default=0.2, + help="Minimum pvalue to choose a bad record from the IRS report") + parser.add_argument("--irs-min-probes", type=int, default=5, + help="IRS results file") + parser.add_argument("--irs-min-cnv-size", type=int, default=10000, + help="Minimum size CNV to trust IRS results for") + parser.add_argument("-l", "--log-level", required=False, default="INFO", + help="Specify level of logging information, ie. info, warning, error (not case-sensitive). " + "Default: INFO") + + parsed_arguments = parser.parse_args(argv[1:] if len(argv) > 1 else ["--help"]) + return parsed_arguments + + +def main(argv: Optional[List[Text]] = None) -> get_truth_overlap.ConfidentVariants: + arguments = __parse_arguments(sys.argv if argv is None else argv) + + logging.basicConfig(format='[%(levelname)s:%(asctime)s] %(message)s', level=getattr(logging, arguments.log_level)) + + valid_vapor_variant_ids = set() + valid_irs_variant_ids = set() + + if arguments.irs_contigs_file is not None: + irs_contigs = frozenset([line.split("\t")[0] for line in read_list_file(arguments.irs_contigs_file)]) + else: + irs_contigs = None + + # scan the vcf to get a list of valid variants + with VariantFile(arguments.vcf) as vcf: + for record in vcf: + svtype = record.info['SVTYPE'] + if not (svtype == 'DEL' or svtype == 'DUP') or record.info['SVLEN'] <= arguments.vapor_max_cnv_size: + valid_vapor_variant_ids.add(record.id) + if (svtype == 'DEL' or svtype == 'DUP') and record.info['SVLEN'] >= arguments.irs_min_cnv_size \ + and (irs_contigs is None or record.contig in irs_contigs): + valid_irs_variant_ids.add(record.id) + + if arguments.irs_sample_batch_lists is not None: + batch_list_paths = read_list_file(arguments.irs_sample_batch_lists) + irs_report_paths = read_list_file(arguments.irs_test_report_list) + + samples_list_to_confident_irs_variant_ids_mapping = { + frozenset(read_list_file(sample_list)): + get_confident_variant_ids_from_irs_report(report_list, + arguments.irs_good_pvalue_threshold, + arguments.irs_bad_pvalue_threshold, + arguments.irs_min_probes, + valid_irs_variant_ids) + for sample_list, report_list in zip(batch_list_paths, irs_report_paths) + } + logging.debug("IRS dump:") + logging.debug(str(samples_list_to_confident_irs_variant_ids_mapping)) + logging.debug("IRS sources:") + for x, y in zip(batch_list_paths, irs_report_paths): + logging.debug(f"{x}\t{y}") + + # for each variant in the IRS table that passes filters as good, + # find all non ref samples and add variant ID to good list + logging.info(f"Sample sets: {len(samples_list_to_confident_irs_variant_ids_mapping)}") + logging.info(f"Valid IRS variant ids: {len(valid_irs_variant_ids)}") + irs_sample_confident_variants = get_irs_sample_confident_variants(arguments.vcf, + valid_irs_variant_ids, + samples_list_to_confident_irs_variant_ids_mapping) + else: + logging.info("No IRS batches were provided.") + irs_sample_confident_variants = {} + logging.info(f"Samples with confident IRS variants: {len(irs_sample_confident_variants)}") + + vapor_files = get_truth_overlap.get_vapor_files(arguments.vapor_json) + + vapor_confident_variants = get_confident_variants_vapor( + vapor_files=vapor_files, + precision=arguments.vapor_min_precision, + valid_variant_ids=valid_vapor_variant_ids, + strategy=arguments.vapor_strategy, + read_strategy_good_support_threshold=arguments.vapor_read_support_pos_thresh, + read_strategy_bad_support_threshold=arguments.vapor_read_support_neg_thresh, + read_strategy_bad_cov_threshold=arguments.vapor_read_support_neg_cov_thresh + ) + logging.info(f"Samples with confident Vapor variants: {len(vapor_confident_variants)}") + + logging.info("Writing IRS labels...") + get_truth_overlap.output_confident_variants(irs_sample_confident_variants, output_file=arguments.irs_output) + logging.info("Writing Vapor labels...") + get_truth_overlap.output_confident_variants(vapor_confident_variants, output_file=arguments.vapor_output) + return irs_sample_confident_variants, vapor_confident_variants + + +if __name__ == "__main__": + main() diff --git a/src/sv_utils/src/sv_utils/get_truth_overlap.py b/src/sv_utils/src/sv_utils/get_truth_overlap.py index 2284ae7e1..354269e25 100644 --- a/src/sv_utils/src/sv_utils/get_truth_overlap.py +++ b/src/sv_utils/src/sv_utils/get_truth_overlap.py @@ -57,7 +57,7 @@ class Default: ) f_beta = 1.0 min_overlap_cutoff_precision = 0.99 - min_vapor_precision = 0.99 + min_vapor_precision = 0.999 inheritance_af_rareness = 0.05 # when checking overlap, pseudo-size of variants with 0 / >0 reference length = scale_factor * SVLEN + expand_bp: expand_point_svs_bp = interval_overlaps.Default.expand_point_svs_bp diff --git a/src/sv_utils/src/sv_utils/report_confident_irs_vapor_variants.py b/src/sv_utils/src/sv_utils/report_confident_irs_vapor_variants.py new file mode 100755 index 000000000..e5a2aab69 --- /dev/null +++ b/src/sv_utils/src/sv_utils/report_confident_irs_vapor_variants.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python + +import sys +import argparse +import gzip + +from pysam import VariantFile, VariantRecord + +from sv_utils import genomics_io, get_truth_overlap +from typing import List, Text, Optional, Dict, Iterable + + +SIZE_BINS = [50, 500, 5000] +SIZE_BIN_LABELS = ['SMALL', 'MED', 'LARGE'] + + +# non veriant genotypes (taken from svtk.utils) +NULL_GT = {(0, 0), (None, None), (0, ), (None, )} +NON_REF_GTS = {"0/1", "1/1"} + + +def get_confident_variants_vapor(vapor_files: Optional[Dict[str, str]], + precision: float, + valid_variant_ids: set) -> get_truth_overlap.ConfidentVariants: + vapor_info = { + sample_id: + get_truth_overlap.select_confident_vapor_variants(vapor_file=vapor_file, + valid_variant_ids=valid_variant_ids, + precision=precision) + for sample_id, vapor_file in vapor_files.items() + } + return vapor_info + + +def get_called_samples(record: VariantRecord) -> set: + samples = set() + for sample in record.samples.keys(): + if record.samples[sample]['GT'] not in NULL_GT and \ + ('FT' not in record.samples[sample] or record.samples[sample]['FT'] == 'PASS'): + samples.add(sample) + return samples + + +def load_irs_test_report(irs_test_report): + irs_results = genomics_io.tsv_to_pandas(data_file=irs_test_report, require_header=True, header_start='') + irs_results.set_index("ID", inplace=True) + return irs_results + + +def read_list_file(path: str) -> Iterable[str]: + with open(path, 'r') as f: + items = [line for line in f.read().splitlines() if line] + if len(items) == 0: + raise ValueError("list empty") + return items + + +def __parse_arguments(argv: List[Text]) -> argparse.Namespace: + # noinspection PyTypeChecker + parser = argparse.ArgumentParser( + description="Get lists of confident variant calls per sample given VaPoR and Array IRS Test inputs", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + prog=argv[0] + ) + parser.add_argument("--vcf", "-v", type=str, + help="GATK-SV VCF file.", required=True) + parser.add_argument("--vapor-json", "-j", type=str, + help="Json file with mapping from sample ID to corresponding VaPoR file.", required=True) + parser.add_argument("--vapor-min-precision", type=float, default=get_truth_overlap.Default.min_vapor_precision, + help="Minimum allowed precision for selecting good or bad variants from VaPoR") + parser.add_argument("--output-summary", type=str, required=True, + help="File to output summary results to") + parser.add_argument("--output-detail", type=str, + help="File (gzipped) to output detail results to") + parser.add_argument("--vapor-max-cnv-size", type=int, default=5000, + help="Maximum size CNV to trust vapor results for") + parser.add_argument("--vapor-read-support-pos-thresh", type=int, default=2, + help="Min Number of supporting vapor reads required for positive example") + parser.add_argument("--irs-sample-batch-lists", type=str, + help="list of lists of samples used in each IRS test batch") + parser.add_argument("--irs-contigs-file", type=str, + help="list of contigs to restrict IRS variants to") + parser.add_argument("--irs-test-report-list", type=str, + help="list of IRS results files") + parser.add_argument("--irs-good-pvalue-threshold", type=float, default=0.000001, + help="Maximum pvalue to choose a good record from the IRS report") + parser.add_argument("--irs-min-probes", type=int, default=5, + help="IRS results file") + parser.add_argument("--irs-min-cnv-size", type=int, default=10000, + help="Minimum size CNV to trust IRS results for") + + parsed_arguments = parser.parse_args(argv[1:] if len(argv) > 1 else ["--help"]) + return parsed_arguments + + +def get_size_bin(svlen): + if svlen > SIZE_BINS[0] and svlen < SIZE_BINS[1]: + return "SMALL" + elif svlen < SIZE_BINS[2]: + return "MED" + else: + return "LARGE" + + +def increment_counts(record, counts_by_svtype, counts_by_svtype_size_bin): + svtype = record.info['SVTYPE'] + svlen = record.info['SVLEN'] + size_bin = get_size_bin(svlen) + if svtype in counts_by_svtype: + counts_by_svtype[svtype] = counts_by_svtype[svtype] + 1 + else: + counts_by_svtype[svtype] = 1 + type_bin_label = svtype + "_" + size_bin + if type_bin_label in counts_by_svtype_size_bin: + counts_by_svtype_size_bin[type_bin_label] = counts_by_svtype_size_bin[type_bin_label] + 1 + else: + counts_by_svtype_size_bin[type_bin_label] = 1 + + +def main(argv: Optional[List[Text]] = None) -> get_truth_overlap.ConfidentVariants: + arguments = __parse_arguments(sys.argv if argv is None else argv) + + if arguments.irs_contigs_file is not None: + irs_contigs = frozenset([line.split("\t")[0] for line in read_list_file(arguments.irs_contigs_file)]) + else: + irs_contigs = None + + vapor_files = get_truth_overlap.get_vapor_files(arguments.vapor_json) + vapor_variants = {sample: genomics_io.vapor_to_pandas(vapor_files[sample]) for sample in vapor_files} + for sample in vapor_variants: + variants_df = vapor_variants[sample] + p_non_ref_old = get_truth_overlap.get_vapor_p_non_ref_old(variants_df) + p_non_ref_genotyped = get_truth_overlap.get_vapor_p_non_ref(variants_df) + p_non_ref_read_threshold = \ + get_truth_overlap.get_vapor_p_non_ref_threshold(variants_df, arguments.vapor_read_support_pos_thresh) + variants_df['p_non_ref_old'] = p_non_ref_old['p_non_ref'] + variants_df['p_non_ref_genotyped'] = p_non_ref_genotyped['p_non_ref'] + variants_df['p_non_ref_read_threshold'] = p_non_ref_read_threshold['p_non_ref'] + + if arguments.irs_sample_batch_lists is not None: + sample_list_file_to_report_file_mapping = zip(read_list_file(arguments.irs_sample_batch_lists), + read_list_file(arguments.irs_test_report_list)) + else: + sample_list_file_to_report_file_mapping = {} + + sample_to_irs_report = {} + for sample_list_file_report_pair in sample_list_file_to_report_file_mapping: + sample_list = read_list_file(sample_list_file_report_pair[0]) + report = load_irs_test_report(sample_list_file_report_pair[1]) + for sample in sample_list: + sample_to_irs_report[sample] = report + + vapor_valid_variants = 0 + irs_valid_variants = 0 + + vapor_valid_variants_by_svtype = {} + irs_valid_variants_by_svtype = {} + + vapor_tested_var_gts = 0 + irs_tested_var_gts = 0 + vapor_gq_supported_var_gts = 0 + vapor_gt_supported_var_gts = 0 + vapor_read_supported_var_gts = 0 + irs_supported_var_gts = 0 + + vapor_tested_counts_by_svtype = {} + vapor_tested_counts_by_svtype_size_bin = {} + + vapor_gq_supported_counts_by_svtype = {} + vapor_gq_supported_counts_by_svtype_size_bin = {} + + vapor_gt_supported_counts_by_svtype = {} + vapor_gt_supported_counts_by_svtype_size_bin = {} + + vapor_reads_supported_counts_by_svtype = {} + vapor_reads_supported_counts_by_svtype_size_bin = {} + + irs_tested_counts_by_svtype = {} + irs_tested_counts_by_svtype_size_bin = {} + + irs_supported_counts_by_svtype = {} + irs_supported_counts_by_svtype_size_bin = {} + + with VariantFile(arguments.vcf) as vcf, \ + open(arguments.output_summary, 'w') as out_summary: + if arguments.output_detail is not None: + out_detail = gzip.open(arguments.output_detail, 'wt') + out_detail.write("SVID\tSAMPLE\tSVTYPE\tSVLEN\tCALLED_GT\tCALLED_GQ\tVAPOR_SUPPORT_GT\tVAPOR_SUPPORT_GQ\t" + "VAPOR_SUPPORT_READS\tIRS_PVALUE\n") + else: + out_detail = None + + for record in vcf: + svtype = record.info['SVTYPE'] + vapor_valid = False + irs_valid = False + if not (svtype == 'DEL' or svtype == 'DUP') or record.info['SVLEN'] <= arguments.vapor_max_cnv_size: + vapor_valid = True + vapor_valid_variants = vapor_valid_variants + 1 + if svtype in vapor_valid_variants_by_svtype: + vapor_valid_variants_by_svtype[svtype] = vapor_valid_variants_by_svtype[svtype] + 1 + else: + vapor_valid_variants_by_svtype[svtype] = 1 + if (svtype == 'DEL' or svtype == 'DUP') and record.info['SVLEN'] >= arguments.irs_min_cnv_size \ + and (irs_contigs is None or record.contig in irs_contigs): + irs_valid = True + irs_valid_variants = irs_valid_variants + 1 + if svtype in irs_valid_variants_by_svtype: + irs_valid_variants_by_svtype[svtype] = irs_valid_variants_by_svtype[svtype] + 1 + else: + irs_valid_variants_by_svtype[svtype] = 1 + if vapor_valid or irs_valid: + for sample in get_called_samples(record): + vapor_support = False + vapor_support_gt = 'NA' + vapor_support_gq = 'NA' + vapor_support_reads = 'NA' + if vapor_valid and sample in vapor_variants: + if record.id in vapor_variants[sample].index: + vapor_rec = vapor_variants[sample].loc[record.id] + vapor_tested_var_gts = vapor_tested_var_gts + 1 + increment_counts(record, vapor_tested_counts_by_svtype, vapor_tested_counts_by_svtype_size_bin) + vapor_support_old = vapor_rec['p_non_ref_old'] > 1 - arguments.vapor_min_precision + vapor_support_genotyped = vapor_rec['p_non_ref_genotyped'] > 1 - arguments.vapor_min_precision + vapor_support_read_threshold = vapor_rec['p_non_ref_read_threshold'] > 1 - arguments.vapor_min_precision + if vapor_support_old or vapor_support_genotyped or vapor_support_read_threshold: + vapor_support_gt = vapor_rec[genomics_io.Keys.gt] + vapor_support_gq = vapor_rec[genomics_io.Keys.gq] + vapor_support_reads = vapor_rec[genomics_io.Keys.vapor_read_scores] + if vapor_support_old: + vapor_gq_supported_var_gts = vapor_gq_supported_var_gts + 1 + increment_counts(record, vapor_gq_supported_counts_by_svtype, vapor_gq_supported_counts_by_svtype_size_bin) + if vapor_support_genotyped: + vapor_gt_supported_var_gts = vapor_gt_supported_var_gts + 1 + increment_counts(record, vapor_gt_supported_counts_by_svtype, vapor_gt_supported_counts_by_svtype_size_bin) + if vapor_support_read_threshold: + vapor_read_supported_var_gts = vapor_read_supported_var_gts + 1 + increment_counts(record, vapor_reads_supported_counts_by_svtype, vapor_reads_supported_counts_by_svtype_size_bin) + + irs_support = False + irs_support_pval = 'NA' + if irs_valid and sample in sample_to_irs_report: + irs_data = sample_to_irs_report[sample] + if record.id in irs_data.index: + irs_tested_var_gts = irs_tested_var_gts + 1 + increment_counts(record, irs_tested_counts_by_svtype, irs_tested_counts_by_svtype_size_bin) + irs_rec = irs_data.loc[record.id] + irs_support = irs_rec['NPROBES'] >= arguments.irs_min_probes and irs_rec[ + "PVALUE"] <= arguments.irs_good_pvalue_threshold + if irs_support: + irs_support_pval = irs_rec['PVALUE'] + irs_supported_var_gts = irs_supported_var_gts + 1 + increment_counts(record, irs_supported_counts_by_svtype, + irs_supported_counts_by_svtype_size_bin) + + if out_detail is not None and (vapor_support or irs_support): + out_detail.write("{}\t{}\t{}\t{}\t{}/{}\t{}\t{}\t{}\t{}\t{}\n" + .format(record.id, + sample, + record.info['SVTYPE'], + record.info['SVLEN'], + record.samples[sample]['GT'][0], + record.samples[sample]['GT'][1], + record.samples[sample]['GQ'], + vapor_support_gt, + vapor_support_gq, + vapor_support_reads, + irs_support_pval) + ) + out_summary.write("#Vapor valid variants: {} {}\n#IRS valid variants: {} {}\n" + .format(vapor_valid_variants, + vapor_valid_variants_by_svtype, + irs_valid_variants, + irs_valid_variants_by_svtype) + ) + out_summary.write("CATEGORY\tCOUNT_VAPOR_TESTED_GTS\tCOUNT_VAPOR_GQ_SUPPORTED_GTS\t" + "COUNT_VAPOR_GT_SUPPORTED_GTS\tCOUNT_VAPOR_READ_SUPPORTED_GTS\tCOUNT_IRS_TESTED_GTS\t" + "COUNT_IRS_SUPPORTED_GTS\n") + out_summary.write("{}\t{}\t{}\t{}\t{}\t{}\t{}\n" + .format("ALL", + vapor_tested_var_gts, + vapor_gq_supported_var_gts, + vapor_gt_supported_var_gts, + vapor_read_supported_var_gts, + irs_tested_var_gts, + irs_supported_var_gts) + ) + for svtype in sorted(set(vapor_tested_counts_by_svtype.keys()).union(set(irs_tested_counts_by_svtype.keys()))): + if svtype in vapor_tested_counts_by_svtype: + vapor_tested_count = vapor_tested_counts_by_svtype[svtype] + if svtype in vapor_gq_supported_counts_by_svtype: + vapor_gq_supported_count = vapor_gq_supported_counts_by_svtype[svtype] + else: + vapor_gq_supported_count = 0 + if svtype in vapor_gt_supported_counts_by_svtype: + vapor_gt_supported_count = vapor_gt_supported_counts_by_svtype[svtype] + else: + vapor_gt_supported_count = 0 + if svtype in vapor_reads_supported_counts_by_svtype: + vapor_reads_supported_count = vapor_reads_supported_counts_by_svtype[svtype] + else: + vapor_reads_supported_count = 0 + else: + vapor_tested_count = 0 + vapor_gq_supported_count = 0 + vapor_gt_supported_count = 0 + vapor_reads_supported_count = 0 + if svtype in irs_tested_counts_by_svtype: + irs_tested_count = irs_tested_counts_by_svtype[svtype] + if svtype in irs_supported_counts_by_svtype: + irs_supported_count = irs_supported_counts_by_svtype[svtype] + else: + irs_supported_count = 0 + else: + irs_tested_count = 0 + irs_supported_count = 0 + out_summary.write("{}\t{}\t{}\t{}\t{}\t{}\t{}\n" + .format(svtype, + vapor_tested_count, + vapor_gq_supported_count, + vapor_gt_supported_count, + vapor_reads_supported_count, + irs_tested_count, + irs_supported_count) + ) + for size_bin in SIZE_BIN_LABELS: + svtype_bin = svtype + "_" + size_bin + if svtype_bin in vapor_tested_counts_by_svtype_size_bin \ + or svtype_bin in irs_tested_counts_by_svtype_size_bin: + if svtype_bin in vapor_tested_counts_by_svtype_size_bin: + vapor_tested = vapor_tested_counts_by_svtype_size_bin[svtype_bin] + if svtype_bin in vapor_gq_supported_counts_by_svtype_size_bin: + vapor_gq_supported = vapor_gq_supported_counts_by_svtype_size_bin[svtype_bin] + else: + vapor_gq_supported = 0 + if svtype_bin in vapor_gt_supported_counts_by_svtype_size_bin: + vapor_gt_supported = vapor_gt_supported_counts_by_svtype_size_bin[svtype_bin] + else: + vapor_gt_supported = 0 + if svtype_bin in vapor_reads_supported_counts_by_svtype_size_bin: + vapor_reads_supported = vapor_reads_supported_counts_by_svtype_size_bin[svtype_bin] + else: + vapor_reads_supported = 0 + + else: + vapor_tested = 0 + vapor_gq_supported = 0 + vapor_gt_supported = 0 + vapor_reads_supported = 0 + + if svtype_bin in irs_tested_counts_by_svtype_size_bin: + irs_tested = irs_tested_counts_by_svtype_size_bin[svtype_bin] + if svtype_bin in irs_supported_counts_by_svtype_size_bin: + irs_supported = irs_supported_counts_by_svtype_size_bin[svtype_bin] + else: + irs_supported = 0 + else: + irs_tested = 0 + irs_supported = 0 + + out_summary.write("{}\t{}\t{}\t{}\t{}\t{}\t{}\n" + .format(svtype_bin, + vapor_tested, + vapor_gq_supported, + vapor_gt_supported, + vapor_reads_supported, + irs_tested, + irs_supported) + ) + if out_detail is not None: + out_detail.close() + + +if __name__ == "__main__": + main() diff --git a/wdl/BenchmarkGqFilter.wdl b/wdl/BenchmarkGqFilter.wdl index ad6fa82df..d5857ee9c 100644 --- a/wdl/BenchmarkGqFilter.wdl +++ b/wdl/BenchmarkGqFilter.wdl @@ -153,7 +153,7 @@ task BenchmarkFilter { String benchmark_figure_filename = "quality-benchmark.pdf" Array[String] benchmark_args = [] String sv_utils_docker - Int num_entries + Float num_entries Float pickled_files_size } diff --git a/wdl/ConcatTextFiles.wdl b/wdl/ConcatTextFiles.wdl new file mode 100644 index 000000000..1a8755e4c --- /dev/null +++ b/wdl/ConcatTextFiles.wdl @@ -0,0 +1,43 @@ +version 1.0 + +import "TasksMakeCohortVcf.wdl" as tasks + +workflow ConcatTextFiles { + + input { + Array[File] text_files + String output_prefix + String output_suffix = "concat.txt" + + Boolean gzipped = false + Boolean headered = false + + String linux_docker + String sv_base_mini_docker + } + + if (!headered) { + # Disable filter command since input might be compressed + call tasks.CatUncompressedFiles { + input: + shards=text_files, + outfile_name="~{output_prefix}.~{output_suffix}", + filter_command="", + sv_base_mini_docker=sv_base_mini_docker + } + } + + if (headered) { + call tasks.ConcatHeaderedTextFiles { + input: + text_files=text_files, + gzipped=gzipped, + output_filename="~{output_prefix}.~{output_suffix}", + linux_docker=linux_docker + } + } + + output { + File concatenated_files = select_first([ConcatHeaderedTextFiles.out, CatUncompressedFiles.outfile]) + } +} diff --git a/wdl/FilterGenotypes.wdl b/wdl/FilterGenotypes.wdl index dc19c5e3b..0603b5750 100644 --- a/wdl/FilterGenotypes.wdl +++ b/wdl/FilterGenotypes.wdl @@ -8,7 +8,7 @@ import "TasksMakeCohortVcf.wdl" as tasks_cohort workflow FilterGenotypes { input { - File vcf # Cleaned GATK-formatted vcf + File vcf # Cleaned GATK-formatted vcf run through SVConcordance String? output_prefix File ploidy_table @@ -16,10 +16,10 @@ workflow FilterGenotypes { Array[String] recalibrate_gq_args = [] Array[File] genome_tracks = [] Float no_call_rate_cutoff = 0.05 # Set to 1 to disable NCR filtering - Float fmax_beta = 0.5 # Recommended range [0.3, 0.5] (use lower values for higher specificity) + Float fmax_beta = 0.4 # Recommended range [0.3, 0.5] (use lower values for higher specificity) # One of the following must be provided - File? truth_json # If given, SL cutoffs will be automatically optimized. Overrides sl_filter_args. TODO: UNIMPLEMENTED! + File? truth_json # If given, SL cutoffs will be automatically optimized. Overrides sl_filter_args. String? sl_filter_args # Explicitly set SL cutoffs. See apply_sl_filter.py for arguments. Int optimize_vcf_records_per_shard = 50000 @@ -76,15 +76,16 @@ workflow FilterGenotypes { sv_pipeline_docker=sv_pipeline_docker } } - call MergeCompressedHeaderedTables { + call tasks_cohort.ConcatHeaderedTextFiles { input: - tables=MakeVcfTable.out, - output_prefix="~{output_prefix_}.vcf_table", + text_files=MakeVcfTable.out, + gzipped=true, + output_filename="~{output_prefix_}.vcf_table.tsv.gz", linux_docker=linux_docker } call OptimizeCutoffs { input: - table=MergeCompressedHeaderedTables.out, + table=ConcatHeaderedTextFiles.out, fmax_beta=fmax_beta, output_prefix="~{output_prefix_}.sl_optimization", sv_pipeline_docker=sv_pipeline_docker @@ -146,60 +147,13 @@ workflow FilterGenotypes { File? main_vcf_qc_tarball = MainVcfQc.sv_vcf_qc_output # For optional analysis - File? vcf_optimization_table = MergeCompressedHeaderedTables.out + File? vcf_optimization_table = ConcatHeaderedTextFiles.out File? sl_cutoff_qc_tarball = OptimizeCutoffs.out File unfiltered_recalibrated_vcf = RecalibrateGq.filtered_vcf File unfiltered_recalibrated_vcf_index = RecalibrateGq.filtered_vcf_index } } - -task MergeCompressedHeaderedTables { - input { - Array[File] tables - String output_prefix - String linux_docker - RuntimeAttr? runtime_attr_override - } - - RuntimeAttr default_attr = object { - cpu_cores: 1, - mem_gb: 1, - disk_gb: ceil(10 + 2 * size(tables, "GB")), - boot_disk_gb: 10, - preemptible_tries: 1, - max_retries: 1 - } - RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) - - output { - File out = "~{output_prefix}.tsv.gz" - } - command <<< - set -euo pipefail - OUT_FILE="~{output_prefix}.tsv.gz" - i=0 - while read path; do - if [ $i == 0 ]; then - # Get header from first line of first file - zcat $path | awk 'NR==1' - | gzip > $OUT_FILE - fi - # Get data from each file, skipping header line - zcat $path | awk 'NR>1' - | gzip >> $OUT_FILE - i=$((i+1)) - done < ~{write_lines(tables)} - >>> - runtime { - cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) - memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" - disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" - bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) - docker: linux_docker - preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) - maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) - } -} - task MakeVcfTable { input { File vcf diff --git a/wdl/MakeGqRecalibratorTrainingSetFromPacBio.wdl b/wdl/MakeGqRecalibratorTrainingSetFromPacBio.wdl new file mode 100644 index 000000000..1c75e496b --- /dev/null +++ b/wdl/MakeGqRecalibratorTrainingSetFromPacBio.wdl @@ -0,0 +1,589 @@ +version 1.0 + +import "Structs.wdl" +import "SVConcordancePacBioSample.wdl" as concordance +import "Utils.wdl" as utils +import "TasksMakeCohortVcf.wdl" as mini_tasks + +workflow MakeGqRecalibratorTrainingSetFromPacBio { + + input { + # Cleaned GATK-formatted vcf + # SVConcordance should be run first if the training set is a proper subset of the cohort + # This can be either a single whole-genome vcf or multiple vcf shards. + # Assumes all vcfs have indexes, i.e. at {VCF_PATH}.tbi + Array[File] vcfs + + File training_sample_ids # Sample IDs with PacBio or array data + String? output_prefix + File ploidy_table + + Array[String] pacbio_sample_ids # Corresponding to files below (must be a subset of training_sample_ids) + Array[File] vapor_files + Array[File] pbsv_vcfs + Array[File] pav_vcfs + Array[File] sniffles_vcfs + + # Optional: array intensity ratio files + Array[File]? irs_sample_batches + Array[File]? irs_test_reports + File? irs_contigs_fai + + Boolean write_detail_report = true + Int? vapor_max_cnv_size = 5000 + Float? vapor_min_precision = 0.999 + Int? vapor_pos_read_threshold = 2 + Int? irs_min_cnv_size = 10000 + Float? irs_good_pvalue_threshold = 0.000001 + Int? irs_min_probes = 5 + + Float? pesr_interval_overlap_strict = 0.1 + Float? pesr_size_similarity_strict = 0.5 + Int? pesr_breakend_window_strict = 5000 + + Float? pesr_interval_overlap_loose = 0 + Float? pesr_size_similarity_loose = 0 + Int? pesr_breakend_window_loose = 5000 + + File reference_dict + + String sv_utils_docker + String gatk_docker + String sv_base_mini_docker + String sv_pipeline_docker + String linux_docker + } + + Array[String] tool_names = ["pbsv", "pav", "sniffles"] + Array[Array[File]] pacbio_vcfs = transpose([pbsv_vcfs, pav_vcfs, sniffles_vcfs]) + + String output_prefix_ = + if defined(output_prefix) then + select_first([output_prefix]) + else + basename(vcfs[0], ".vcf.gz") + + scatter (i in range(length(vcfs))) { + call utils.SubsetVcfBySamplesList as SubsetTrainingSamples { + input: + vcf = vcfs[i], + vcf_idx = vcfs[i] + ".tbi", + list_of_samples = training_sample_ids, + outfile_name = "~{output_prefix_}.training_samples.shard_~{i}.vcf.gz", + remove_samples = false, + remove_private_sites = true, + keep_af = true, + sv_base_mini_docker = sv_base_mini_docker + } + } + + call mini_tasks.ConcatVcfs as ConcatTrainingSampleVcfs { + input: + vcfs=SubsetTrainingSamples.vcf_subset, + vcfs_idx=SubsetTrainingSamples.vcf_subset_index, + naive=true, + outfile_prefix="~{output_prefix_}.concat_training_sample_vcfs", + sv_base_mini_docker=sv_base_mini_docker + } + + call GetVariantListsFromVaporAndIRS { + input: + vcf=ConcatTrainingSampleVcfs.concat_vcf, + output_prefix=output_prefix_, + vapor_sample_ids=pacbio_sample_ids, + vapor_files=vapor_files, + irs_sample_batches=select_first([irs_sample_batches, []]), + irs_test_reports=select_first([irs_test_reports, []]), + irs_contigs_fai=irs_contigs_fai, + vapor_max_cnv_size=vapor_max_cnv_size, + vapor_min_precision=vapor_min_precision, + vapor_pos_read_threshold=vapor_pos_read_threshold, + irs_min_cnv_size=irs_min_cnv_size, + irs_good_pvalue_threshold=irs_good_pvalue_threshold, + irs_min_probes=irs_min_probes, + sv_utils_docker=sv_utils_docker + } + + call VaporAndIRSSupportReport { + input: + vcf=ConcatTrainingSampleVcfs.concat_vcf, + output_prefix=output_prefix_, + vapor_sample_ids=pacbio_sample_ids, + vapor_files=vapor_files, + write_detail_report=write_detail_report, + irs_sample_batches=select_first([irs_sample_batches, []]), + irs_test_reports=select_first([irs_test_reports, []]), + irs_contigs_fai=irs_contigs_fai, + vapor_max_cnv_size=vapor_max_cnv_size, + vapor_min_precision=vapor_min_precision, + vapor_pos_read_threshold=vapor_pos_read_threshold, + irs_min_cnv_size=irs_min_cnv_size, + irs_good_pvalue_threshold=irs_good_pvalue_threshold, + irs_min_probes=irs_min_probes, + sv_utils_docker=sv_utils_docker + } + + call utils.WriteLines as WritePacBioSampleIds { + input: + lines = pacbio_sample_ids, + output_filename = "~{output_prefix_}.pacbio_sample_ids.list", + linux_docker = linux_docker + } + + scatter (i in range(length(vcfs))) { + call utils.SubsetVcfBySamplesList as SubsetPacBioSamples { + input: + vcf = vcfs[i], + vcf_idx = vcfs[i] + ".tbi", + list_of_samples = WritePacBioSampleIds.out, + outfile_name = "~{output_prefix_}.pacbio_samples.shard_~{i}.vcf.gz", + remove_samples = false, + remove_private_sites = true, + keep_af = true, + sv_base_mini_docker = sv_base_mini_docker + } + } + + call mini_tasks.ConcatVcfs as ConcatPacbioSampleVcfs { + input: + vcfs=SubsetPacBioSamples.vcf_subset, + vcfs_idx=SubsetPacBioSamples.vcf_subset_index, + naive=true, + outfile_prefix="~{output_prefix_}.concat_pacbio_sample_vcfs", + sv_base_mini_docker=sv_base_mini_docker + } + + scatter (i in range(length(pacbio_sample_ids))) { + call PrepSampleVcf { + input: + sample_id=pacbio_sample_ids[i], + vcf=ConcatPacbioSampleVcfs.concat_vcf, + vcf_index=ConcatPacbioSampleVcfs.concat_vcf_idx, + output_prefix=output_prefix_, + sv_pipeline_docker=sv_pipeline_docker + } + call concordance.SVConcordancePacBioSample as SVConcordanceLoose { + input: + sample_id=pacbio_sample_ids[i], + sample_vcf=PrepSampleVcf.out, + pacbio_sample_vcfs=pacbio_vcfs[i], + tool_names=tool_names, + prefix="~{output_prefix_}.loose", + ploidy_table=ploidy_table, + reference_dict=reference_dict, + pesr_interval_overlap=pesr_interval_overlap_loose, + pesr_size_similarity=pesr_size_similarity_loose, + pesr_breakend_window=pesr_breakend_window_loose, + sv_base_mini_docker=sv_base_mini_docker, + sv_pipeline_docker=sv_pipeline_docker, + gatk_docker=gatk_docker, + linux_docker=linux_docker + } + call concordance.SVConcordancePacBioSample as SVConcordanceStrict { + input: + sample_id=pacbio_sample_ids[i], + sample_vcf=PrepSampleVcf.out, + pacbio_sample_vcfs=pacbio_vcfs[i], + tool_names=tool_names, + prefix="~{output_prefix_}.strict", + ploidy_table=ploidy_table, + reference_dict=reference_dict, + pesr_interval_overlap=pesr_interval_overlap_strict, + pesr_size_similarity=pesr_size_similarity_strict, + pesr_breakend_window=pesr_breakend_window_strict, + sv_base_mini_docker=sv_base_mini_docker, + sv_pipeline_docker=sv_pipeline_docker, + gatk_docker=gatk_docker, + linux_docker=linux_docker + } + call RefineSampleLabels { + input: + sample_id=pacbio_sample_ids[i], + main_vcf=PrepSampleVcf.out, + vapor_json=GetVariantListsFromVaporAndIRS.vapor_json, + tool_names=tool_names, + loose_concordance_vcfs=SVConcordanceLoose.pacbio_concordance_vcfs, + strict_concordance_vcfs=SVConcordanceStrict.pacbio_concordance_vcfs, + output_prefix="~{output_prefix_}.gq_training_labels.~{pacbio_sample_ids[i]}", + sv_pipeline_docker=sv_pipeline_docker + } + } + + call MergeJsons { + input: + jsons=flatten([[GetVariantListsFromVaporAndIRS.irs_json], RefineSampleLabels.out_json]), + output_prefix="~{output_prefix_}.gq_training_labels", + sv_pipeline_docker=sv_pipeline_docker + } + + call mini_tasks.ConcatHeaderedTextFiles { + input: + text_files=RefineSampleLabels.out_table, + gzipped=false, + output_filename="~{output_prefix_}.gq_training_labels.tsv", + linux_docker=linux_docker + } + + output { + File gq_recalibrator_training_json = MergeJsons.out + File pacbio_support_summary_table = ConcatHeaderedTextFiles.out + + File training_sample_vcf = ConcatTrainingSampleVcfs.concat_vcf + File training_sample_vcf_index = ConcatTrainingSampleVcfs.concat_vcf_idx + File pacbio_sample_vcf = ConcatPacbioSampleVcfs.concat_vcf + File pacbio_sample_vcf_index = ConcatPacbioSampleVcfs.concat_vcf_idx + + File vapor_output_json = GetVariantListsFromVaporAndIRS.vapor_json + File irs_output_json = GetVariantListsFromVaporAndIRS.irs_json + File vapor_and_irs_summary_report = VaporAndIRSSupportReport.summary + File vapor_and_irs_detail_report = VaporAndIRSSupportReport.detail + Array[File] loose_pacbio_concordance_vcf_tars = SVConcordanceLoose.pacbio_concordance_vcfs_tar + Array[File] strict_pacbio_concordance_vcf_tars = SVConcordanceStrict.pacbio_concordance_vcfs_tar + } +} + +task GetVariantListsFromVaporAndIRS { + input { + File vcf + String output_prefix + Array[String] vapor_sample_ids + Array[File] vapor_files + + Array[File] irs_sample_batches + Array[File] irs_test_reports + File? irs_contigs_fai + + Int? vapor_max_cnv_size + Float? vapor_min_precision + String vapor_strategy = "READS" + Int? vapor_pos_read_threshold + Int? vapor_neg_read_threshold + Int? vapor_neg_cov_read_threshold + Int? irs_min_cnv_size + Float? irs_good_pvalue_threshold + Float? irs_bad_pvalue_threshold + Int? irs_min_probes + + File? script + String? additional_args + + String sv_utils_docker + RuntimeAttr? runtime_attr_override + } + + output { + File vapor_json = "~{output_prefix}.gq_recalibrator_labels.vapor.json" + File irs_json = "~{output_prefix}.gq_recalibrator_labels.irs.json" + } + + command <<< + set -euxo pipefail + + # JSON preprocessing taken from GetTruthOverlap.wdl + # construct a vapor JSON file if vapor data was passed + VAPOR_SAMPLE_IDS=~{write_lines(vapor_sample_ids)} + VAPOR_FILES=~{write_lines(vapor_files)} + # this is all horrible, but it's just taking text processing to turn the sample IDs and vapor files arrays + # into a json file that contains a map with sample IDs as keys, and corresponding vapor files as values + { + echo '{' + paste -d: $VAPOR_SAMPLE_IDS $VAPOR_FILES \ + | sed -e 's/^/\"/' -e 's/:/\":\"/' -e 's/$/\",/' + echo '}' + } \ + | tr -d '\n' \ + | sed 's/,}/}/' \ + | sed -e 's/\(,\|{\)/\1\n/g' -e 's/"}/"\n}\n/' \ + | sed 's/^"/ "/g' \ + > vapor_data.json + printf "vapor_data.json: " + cat vapor_data.json + + IRS_SAMPLE_BATCHES=~{write_lines(irs_sample_batches)} + IRS_TEST_REPORTS=~{write_lines(irs_test_reports)} + python ~{default="/opt/sv_utils/src/sv_utils/get_confident_variant_lists_from_vapor_and_irs_test.py" script} \ + --vapor-json vapor_data.json \ + --vcf ~{vcf} \ + ~{if length(irs_sample_batches) > 0 then "--irs-sample-batch-lists $IRS_SAMPLE_BATCHES" else ""} \ + ~{if length(irs_test_reports) > 0 then "--irs-test-report-list $IRS_TEST_REPORTS" else ""} \ + ~{"--irs-contigs-file " + irs_contigs_fai} \ + ~{"--vapor-max-cnv-size " + vapor_max_cnv_size} \ + ~{"--vapor-min-precision " + vapor_min_precision} \ + ~{"--irs-min-cnv-size " + irs_min_cnv_size} \ + ~{"--irs-min-probes " + irs_min_probes} \ + ~{"--irs-good-pvalue-threshold " + irs_good_pvalue_threshold} \ + ~{"--irs-bad-pvalue-threshold " + irs_bad_pvalue_threshold} \ + --vapor-strategy ~{vapor_strategy} \ + ~{"--vapor-read-support-pos-thresh " + vapor_pos_read_threshold} \ + ~{"--vapor-read-support-neg-thresh " + vapor_neg_read_threshold} \ + ~{"--vapor-read-support-neg-cov-thresh " + vapor_neg_cov_read_threshold} \ + --irs-output ~{output_prefix}.gq_recalibrator_labels.irs.json \ + --vapor-output ~{output_prefix}.gq_recalibrator_labels.vapor.json \ + ~{additional_args} + >>> + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 4, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1, + disk_gb: 25 + ceil(size([ + vcf], "GiB")) + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_utils_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} + +task PrepSampleVcf { + input { + String sample_id + File vcf + File vcf_index + File? script + String output_prefix + String sv_pipeline_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: ceil(50 + size(vcf, "GB")), + boot_disk_gb: 10, + preemptible_tries: 1, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File out = "~{output_prefix}.~{sample_id}.prepped_for_pb.vcf.gz" + File out_index = "~{output_prefix}.~{sample_id}.prepped_for_pb.vcf.gz.tbi" + } + command <<< + set -euxo pipefail + + # Subset to DEL/DUP/INS and convert DUP to INS + bcftools view -s "~{sample_id}" ~{vcf} | bcftools view --no-update --min-ac 1 -Oz -o tmp1.vcf.gz + python ~{default="/opt/sv-pipeline/scripts/preprocess_gatk_for_pacbio_eval.py" script} tmp1.vcf.gz \ + | bcftools sort -Oz -o ~{output_prefix}.~{sample_id}.prepped_for_pb.vcf.gz + tabix ~{output_prefix}.~{sample_id}.prepped_for_pb.vcf.gz + >>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_pipeline_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} + + +task VaporAndIRSSupportReport { + input { + File vcf + String output_prefix + Array[String] vapor_sample_ids + Array[File] vapor_files + Array[File] irs_sample_batches + Array[File] irs_test_reports + Boolean write_detail_report + File? irs_contigs_fai + Int? vapor_max_cnv_size + Float? vapor_min_precision + Int? vapor_pos_read_threshold + Int? irs_min_cnv_size + Float? irs_good_pvalue_threshold + Int? irs_min_probes + + File? script + + String sv_utils_docker + RuntimeAttr? runtime_attr_override + } + + output { + File summary = "${output_prefix}.irs_vapor_support.summary.tsv" + File detail = "${output_prefix}.irs_vapor_support.detail.tsv.gz" + } + + command <<< + set -euxo pipefail + + # JSON preprocessing taken from GetTruthOverlap.wdl + # construct a vapor JSON file if vapor data was passed + VAPOR_SAMPLE_IDS=~{write_lines(vapor_sample_ids)} + VAPOR_FILES=~{write_lines(vapor_files)} + # this is all horrible, but it's just taking text processing to turn the sample IDs and vapor files arrays + # into a json file that contains a map with sample IDs as keys, and corresponding vapor files as values + { + echo '{' + paste -d: $VAPOR_SAMPLE_IDS $VAPOR_FILES \ + | sed -e 's/^/\"/' -e 's/:/\":\"/' -e 's/$/\",/' + echo '}' + } \ + | tr -d '\n' \ + | sed 's/,}/}/' \ + | sed -e 's/\(,\|{\)/\1\n/g' -e 's/"}/"\n}\n/' \ + | sed 's/^"/ "/g' \ + > vapor_data.json + printf "vapor_data.json: " + cat vapor_data.json + + touch ~{output_prefix}.irs_vapor_support.detail.tsv.gz + IRS_SAMPLE_BATCHES=~{write_lines(irs_sample_batches)} + IRS_TEST_REPORTS=~{write_lines(irs_test_reports)} + python ~{default="/opt/sv_utils/src/sv_utils/report_confident_irs_vapor_variants.py" script} \ + --vapor-json vapor_data.json \ + --vcf ~{vcf} \ + ~{if length(irs_sample_batches) > 0 then "--irs-sample-batch-lists $IRS_SAMPLE_BATCHES" else ""} \ + ~{if length(irs_test_reports) > 0 then "--irs-test-report-list $IRS_TEST_REPORTS" else ""} \ + ~{"--irs-contigs-file " + irs_contigs_fai} \ + ~{"--vapor-max-cnv-size " + vapor_max_cnv_size} \ + ~{"--vapor-min-precision " + vapor_min_precision} \ + ~{"--irs-min-cnv-size " + irs_min_cnv_size} \ + ~{"--irs-min-probes " + irs_min_probes} \ + ~{"--irs-good-pvalue-threshold " + irs_good_pvalue_threshold} \ + --output-summary ~{output_prefix}.irs_vapor_support.summary.tsv \ + ~{"--vapor-read-support-pos-thresh " + vapor_pos_read_threshold} \ + ~{if write_detail_report then "--output-detail ~{output_prefix}.irs_vapor_support.detail.tsv.gz" else ""} + >>> + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 16, + boot_disk_gb: 10, + preemptible_tries: 1, + max_retries: 1, + disk_gb: 100 + ceil(size([ + vcf], "GiB")) * 2 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_utils_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} + + +task RefineSampleLabels { + input { + String sample_id + File main_vcf + File vapor_json + Array[String] tool_names + Array[File] loose_concordance_vcfs + Array[File] strict_concordance_vcfs + String? additional_args + File? script + String output_prefix + String sv_pipeline_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: 10, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + String additional_args_ = if defined(additional_args) then select_first([additional_args]) else "" + + output { + File out_json = "~{output_prefix}.json" + File out_table = "~{output_prefix}.tsv" + } + command <<< + set -euo pipefail + python ~{default="/opt/sv-pipeline/scripts/refine_training_set.py" script} \ + --loose-concordance-vcfs ~{sep=" " loose_concordance_vcfs} \ + --strict-concordance-vcfs ~{sep=" " strict_concordance_vcfs} \ + --main-vcf ~{main_vcf} \ + --vapor-json ~{vapor_json} \ + --sample-id ~{sample_id} \ + --json-out ~{output_prefix}.json \ + --table-out ~{output_prefix}.tsv \ + --truth-algorithms ~{sep="," tool_names} \ + ~{additional_args_} + >>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_pipeline_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} + +task MergeJsons { + input { + Array[File] jsons + String output_prefix + String sv_pipeline_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 7.5, + disk_gb: ceil(10 + size(jsons, "GB") * 2), + boot_disk_gb: 10, + preemptible_tries: 1, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File out = "~{output_prefix}.json" + } + command <<< + set -euo pipefail + python3 <>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_pipeline_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} diff --git a/wdl/PickleVcfProperties.wdl b/wdl/PickleVcfProperties.wdl index 9af92bab5..a66908590 100644 --- a/wdl/PickleVcfProperties.wdl +++ b/wdl/PickleVcfProperties.wdl @@ -35,7 +35,7 @@ workflow PickleVcfProperties { output { Int num_records = GetVcfSize.num_records Int num_samples = GetVcfSize.num_samples - Int num_entries = GetVcfSize.num_entries + Float num_entries = GetVcfSize.num_entries File pickled_properties = ReadAndPickleProperties.pickled_properties } } @@ -43,7 +43,7 @@ workflow PickleVcfProperties { task GetNeededMemGB { input { Array[String] wanted_properties - Int num_entries + Float num_entries } Float mem_gb = "1.5" diff --git a/wdl/SVConcordancePacBioSample.wdl b/wdl/SVConcordancePacBioSample.wdl new file mode 100644 index 000000000..2f962c09d --- /dev/null +++ b/wdl/SVConcordancePacBioSample.wdl @@ -0,0 +1,140 @@ +version 1.0 + +import "Structs.wdl" +import "SVConcordance.wdl" as conc +import "TasksMakeCohortVcf.wdl" as tasks_cohort +import "Utils.wdl" as utils + +workflow SVConcordancePacBioSample { + input { + String sample_id # Sample IDs will be renamed to this in the PacBio vcfs + File sample_vcf # Single-sample vcf, usually derived from the cleaned vcf. Sample ID should match sample_id. + Array[File] pacbio_sample_vcfs # Raw single-sample vcfs from each tool. Sample ID will be changed to sample_id. + Array[String] tool_names # Names of PacBio tools in same order as pacbio_sample_vcfs + String prefix + File ploidy_table + + Int? pacbio_min_size + + Float? pesr_interval_overlap + Float? pesr_size_similarity + Int? pesr_breakend_window + + File reference_dict + + # For debugging + File? preprocess_script + + String gatk_docker + String sv_base_mini_docker + String sv_pipeline_docker + String linux_docker + + Float? java_mem_fraction + + RuntimeAttr? runtime_attr_combine_truth + RuntimeAttr? runtime_attr_sv_concordance + RuntimeAttr? runtime_attr_sort_vcf + RuntimeAttr? runtime_attr_tar + } + + scatter (i in range(length(tool_names))) { + call PrepPacBioVcf { + input: + sample_id=sample_id, + vcf=pacbio_sample_vcfs[i], + min_size=pacbio_min_size, + truth_tool_name=tool_names[i], + ploidy_table=ploidy_table, + output_prefix="~{prefix}.prep_~{tool_names[i]}_vcf.~{sample_id}", + script=preprocess_script, + sv_pipeline_docker=sv_pipeline_docker, + runtime_attr_override=runtime_attr_combine_truth + } + call conc.SVConcordanceTask { + input: + truth_vcf=PrepPacBioVcf.out, + eval_vcf=sample_vcf, + output_prefix="~{prefix}.concordance.~{tool_names[i]}.~{sample_id}.unsorted", + additional_args="--pesr-interval-overlap ~{pesr_interval_overlap} --pesr-size-similarity ~{pesr_size_similarity} --pesr-breakend-window ~{pesr_breakend_window}", + reference_dict=reference_dict, + java_mem_fraction=java_mem_fraction, + gatk_docker=gatk_docker, + runtime_attr_override=runtime_attr_sv_concordance + } + call tasks_cohort.SortVcf { + input: + vcf=SVConcordanceTask.out_unsorted, + outfile_prefix="~{prefix}.concordance.~{tool_names[i]}.~{sample_id}.sorted", + sv_base_mini_docker=sv_base_mini_docker, + runtime_attr_override=runtime_attr_sort_vcf + } + } + + call utils.TarFiles { + input: + files=flatten([SortVcf.out, SortVcf.out_index]), + prefix="~{prefix}.pacbio_concordance_vcfs.~{sample_id}", + linux_docker=linux_docker, + runtime_attr_override=runtime_attr_tar + } + + output { + Array[File] pacbio_concordance_vcfs = SortVcf.out + File pacbio_concordance_vcfs_tar = TarFiles.out + } +} + +task PrepPacBioVcf { + input { + String sample_id + File vcf + String truth_tool_name + Int min_size = 25 + File ploidy_table + File? script + String output_prefix + String sv_pipeline_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 1.5, + disk_gb: ceil(10 + 3 * size(vcf, "GB")), + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File out = "~{output_prefix}.vcf.gz" + File out_index = "~{output_prefix}.vcf.gz.tbi" + } + command <<< + set -euxo pipefail + + # Convert format + bcftools reheader -s <(echo "~{sample_id}") ~{vcf} > tmp1.vcf.gz + python ~{default="/opt/sv-pipeline/scripts/format_pb_for_gatk.py" script} \ + --vcf tmp1.vcf.gz \ + --algorithm ~{truth_tool_name} \ + --min-size ~{min_size} \ + --out tmp2.vcf.gz \ + --ploidy-table ~{ploidy_table} + bcftools sort tmp2.vcf.gz \ + | bcftools annotate --set-id '~{truth_tool_name}_%CHROM\_%POS\_%END\_%SVTYPE\_%SVLEN' -Oz -o ~{output_prefix}.vcf.gz + tabix ~{output_prefix}.vcf.gz + >>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_pipeline_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} + diff --git a/wdl/TasksMakeCohortVcf.wdl b/wdl/TasksMakeCohortVcf.wdl index 1cf9237d7..d489831e8 100644 --- a/wdl/TasksMakeCohortVcf.wdl +++ b/wdl/TasksMakeCohortVcf.wdl @@ -118,6 +118,57 @@ task CatUncompressedFiles { } } + +task ConcatHeaderedTextFiles { + input { + Array[File] text_files + Boolean gzipped = false + String output_filename + String linux_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 1, + disk_gb: ceil(10 + 2 * size(text_files, "GB")), + boot_disk_gb: 10, + preemptible_tries: 1, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + String cat_command = if gzipped then "zcat" else "cat" + String compress_command = if gzipped then "| gzip" else "" + + output { + File out = "~{output_filename}" + } + command <<< + set -euo pipefail + OUT_FILE="~{output_filename}" + i=0 + while read path; do + if [ $i == 0 ]; then + # Get header from first line of first file + ~{cat_command} $path | awk 'NR==1' ~{compress_command} > $OUT_FILE + fi + # Get data from each file, skipping header line + ~{cat_command} $path | awk 'NR>1' ~{compress_command} >> $OUT_FILE + i=$((i+1)) + done < ~{write_lines(text_files)} + >>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: linux_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} + task SortVcf { input { File vcf diff --git a/wdl/TrainGqRecalibrator.wdl b/wdl/TrainGqRecalibrator.wdl index f7999813b..26afa7fbd 100644 --- a/wdl/TrainGqRecalibrator.wdl +++ b/wdl/TrainGqRecalibrator.wdl @@ -12,6 +12,7 @@ workflow TrainGqRecalibrator { Array[String] train_args = [] String gatk_docker String samtools_cloud_docker + Int? preemptible_tries } call Utils.GetVcfSize { @@ -30,7 +31,8 @@ workflow TrainGqRecalibrator { gq_recalibrator_model_file=gq_recalibrator_model_file, train_args=train_args, gatk_docker=gatk_docker, - num_entries=GetVcfSize.num_entries + num_entries=GetVcfSize.num_entries, + preemptible_tries=preemptible_tries } output { @@ -48,10 +50,11 @@ task TrainGqRecalibratorTask { File? gq_recalibrator_model_file # can be passed to do extra rounds of training on existing model Array[String] train_args = [] String gatk_docker - Int? num_entries + Float? num_entries Float mem_scale_vcf_size = 25.2 Float mem_scale_num_entries = "3.7e-7" Float mem_gb_overhead = 1.5 + Int preemptible_tries = 3 } Int disk_gb = round(1000 + size([train_vcf, train_vcf_index, ped_file, truth_file], "GiB") + @@ -68,7 +71,7 @@ task TrainGqRecalibratorTask { runtime { docker: gatk_docker cpu: 1 - preemptible: 3 + preemptible: preemptible_tries max_retries: 1 memory: mem_gb + " GiB" disks: "local-disk " + disk_gb + " HDD" diff --git a/wdl/Utils.wdl b/wdl/Utils.wdl index 16efe98b9..6c8cbfe82 100644 --- a/wdl/Utils.wdl +++ b/wdl/Utils.wdl @@ -539,7 +539,7 @@ task GetVcfSize { output { Int num_records = read_int(num_records_file) Int num_samples = read_int(num_samples_file) - Int num_entries = num_records * num_samples + Float num_entries = read_float(num_records_file) * num_samples } }