diff --git a/src/sv-pipeline/04_variant_resolution/scripts/process_posthoc_cpx_depth_regenotyping.py b/src/sv-pipeline/04_variant_resolution/scripts/process_posthoc_cpx_depth_regenotyping.py index 00e78f5c7..8345f5146 100644 --- a/src/sv-pipeline/04_variant_resolution/scripts/process_posthoc_cpx_depth_regenotyping.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/process_posthoc_cpx_depth_regenotyping.py @@ -11,7 +11,6 @@ from pysam import VariantFile from intervaltree import IntervalTree - # Parameters MIN_SIZE = 1000 MIN_DIFF = 0.4 @@ -126,7 +125,7 @@ def __init__(self, variant_record, end_dict): # Source interval if "SOURCE" in variant_record.info: - source_tokens = variant_record.info["SOURCE"].replace("_", "\t").replace(":", "\t")\ + source_tokens = variant_record.info["SOURCE"].replace("_", "\t").replace(":", "\t") \ .replace("-", "\t").split("\t") if len(source_tokens) != 4: raise ValueError(f"Record {variant_record.vid} has SOURCE field with unexpected format: " @@ -586,17 +585,15 @@ def _evaluate_dup5_ins3(r, records_list, default_sv_type, default_cpx_type): # Test dup interval for evidence of duplication # TODO unsure if the check should be == "DUP" - dup_confirmed = any(m.cnv_assessment != "WT" - and has_reciprocal_overlap(dup_chrom, dup_start, dup_end, m.chrom, m.start, m.end, 0.95) + dup_confirmed = any(m.cnv_assessment != "WT" and + has_reciprocal_overlap(dup_chrom, dup_start, dup_end, m.chrom, m.start, m.end, 0.95) for m in records_list) # As long as dup doesn't explicitly fail depth genotyping, proceed if dup_confirmed: # If sink length >= MINSIZEiDEL, test sink interval for evidence of deletion - sink_is_del = r.sink_size() >= MIN_SIZE_IDEL \ - and any(m.cnv_assessment == "DEL" - and has_reciprocal_overlap(r.sink_chrom, r.sink_start, r.sink_end, - m.chrom, m.start, m.end, 0.95) - for m in records_list) + sink_is_del = r.sink_size() >= MIN_SIZE_IDEL and any( + m.cnv_assessment == "DEL" and has_reciprocal_overlap(r.sink_chrom, r.sink_start, r.sink_end, m.chrom, + m.start, m.end, 0.95) for m in records_list) # Get inversion interval. Corresponds to min dup/ins coord and max sink coord inv_chrom = r.sink_chrom inv_start = dup_start @@ -695,17 +692,15 @@ def _evaluate_dup3_ins5(r, records_list, default_sv_type, default_cpx_type): dup_size = dup_end - dup_start # Test dup interval for evidence of duplication # TODO should this check should be == "DUP" ? - dup_confirmed = any(m.cnv_assessment != "WT" - and has_reciprocal_overlap(dup_chrom, dup_start, dup_end, m.chrom, m.start, m.end, 0.95) - for m in records_list) + dup_confirmed = any( + m.cnv_assessment != "WT" and has_reciprocal_overlap(dup_chrom, dup_start, dup_end, m.chrom, m.start, m.end, + 0.95) for m in records_list) # As long as dup doesn't explicitly fail depth genotyping, proceed if dup_confirmed: # If sink length >= MINSIZEiDEL, test sink interval for evidence of deletion - sink_is_del = r.sink_size() >= MIN_SIZE_IDEL \ - and any(m.cnv_assessment == "DEL" - and has_reciprocal_overlap(r.sink_chrom, r.sink_start, r.sink_end, - m.chrom, m.start, m.end, 0.95) - for m in records_list) + sink_is_del = r.sink_size() >= MIN_SIZE_IDEL and any( + m.cnv_assessment == "DEL" and has_reciprocal_overlap(r.sink_chrom, r.sink_start, r.sink_end, m.chrom, + m.start, m.end, 0.95) for m in records_list) # Get inversion interval. Corresponds to min sink coord and max dup/inv coord # Note this is different from the DUP5/INS3 case inv_chrom = r.sink_chrom @@ -809,14 +804,17 @@ def _evaluate_ins_idel(r, records_list, default_sv_type, default_cpx_type): sink_record = sinks[0] if sink_record.size() >= MIN_SIZE_IDEL: # If insertion site size >= MINSIZEiDEL, check if insertion site is deleted - sink_is_del = any(m.cnv_assessment == "DEL" - and not has_reciprocal_overlap(r.sink_chrom, r.sink_start, r.sink_end, - m.chrom, m.start, m.end, 0.95) - for m in records_list) - sink_is_not_del = any(m.cnv_assessment == "WT" - and not has_reciprocal_overlap(r.sink_chrom, r.sink_start, r.sink_end, - m.chrom, m.start, m.end, 0.95) - for m in records_list) + sink_is_del = any( + m.cnv_assessment == "DEL" and not has_reciprocal_overlap( + r.sink_chrom, r.sink_start, r.sink_end, + m.chrom, m.start, m.end, 0.95) + for m in records_list + ) + sink_is_not_del = any( + m.cnv_assessment == "WT" and not has_reciprocal_overlap( + r.sink_chrom, r.sink_start, r.sink_end, m.chrom, m.start, m.end, 0.95) + for m in records_list + ) else: sink_is_del = False sink_is_not_del = False @@ -902,9 +900,11 @@ def _evaluate_bnd(r, records_list, default_sv_type, default_cpx_type): dup_end = r.sink_end dup_size = dup_end - dup_start # Test dup interval for explicit confirmation of duplication - dup_confirmed = any(m.cnv_assessment == "DUP" - and has_reciprocal_overlap(dup_chrom, dup_start, dup_end, m.chrom, m.start, m.end, 0.95) - for m in records_list) + dup_confirmed = any( + m.cnv_assessment == "DUP" and has_reciprocal_overlap(dup_chrom, dup_start, dup_end, + m.chrom, m.start, m.end, 0.95) + for m in records_list + ) if dup_confirmed: # If dup confirms, resolve as palindromic inverted duplication new_cpx_type = "piDUP_FR" if default_cpx_type == "INVERSION_SINGLE_ENDER_" else "piDUP_RF" @@ -1111,6 +1111,7 @@ def write_reclassification_table(path, assessment): f.write(f"{r.vid}\t{r.modification}\t{r.reason}\t{r.new_sv_type}\t{r.new_cpx_type}\t{new_cpx_intervals}\t" f"{new_svlen}\t{new_source}\t{new_start}\t{new_end}\n") + def _parse_arguments(argv: List[Text]) -> argparse.Namespace: # noinspection PyTypeChecker parser = argparse.ArgumentParser(