diff --git a/modules/igvf-chromap/run_chromap.py b/modules/igvf-chromap/run_chromap.py index 6b4b4b7..5339b06 100644 --- a/modules/igvf-chromap/run_chromap.py +++ b/modules/igvf-chromap/run_chromap.py @@ -2,7 +2,9 @@ import csv import gzip import logging +import os import shutil +import signal import sys import subprocess @@ -10,6 +12,38 @@ logging.basicConfig(stream=sys.stderr, level=logging.INFO) +def run_shell_cmd(cmd): + p = subprocess.Popen( + ['/bin/bash', '-o', 'pipefail'], # to catch error in pipe + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + preexec_fn=os.setsid) # to make a new process with a new PGID + pid = p.pid + pgid = os.getpgid(pid) + logging.info('run_shell_cmd: PID={}, PGID={}, CMD={}'.format(pid, pgid, cmd)) + stdout, stderr = p.communicate(cmd) + rc = p.returncode + err_str = ( + 'PID={pid}, PGID={pgid}, RC={rc}' + 'STDERR={stde}\nSTDOUT={stdo}' + ).format( + pid=pid, pgid=pgid, rc=rc, stde=stderr.strip(), stdo=stdout.strip() + ) + if rc: + # kill all child processes + try: + os.killpg(pgid, signal.SIGKILL) + except Exception: + pass + finally: + raise Exception(err_str) + else: + logging.info(err_str) + return stdout.strip('\n') + + def check_and_unzip(file_path): """ Checks if a file is gzipped and unzips it if necessary. @@ -148,11 +182,7 @@ def align(index_dir, read_format, reference_fasta, output_dir, subpool, threads, cmd = f"chromap -x {index_dir}/index --read_format {read_format} -r {reference_fasta} --remove-pcr-duplicates-at-cell-level --trim-adapters --low-mem --BED -l 2000 --bc-error-threshold 1 -t {threads} --bc-probability-threshold 0.90 -q 30 --barcode-whitelist {barcode_onlist} {barcode_translate_param} -o {output_dir}.fragments.tsv --summary {output_dir}.barcode.summary.csv -1 {read1} -2 {read2} -b {read_barcode} > {output_dir}.log.txt 2>&1" logging.info(f"Running command: {cmd}") - try: - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) - logging.info(f"Command output: {result.stdout}") - except subprocess.CalledProcessError as e: - logging.error(f"Command failed with error: {e.stderr}") + run_shell_cmd(cmd) # Append the subpool to the barcodes in the fragment file. if subpool: diff --git a/modules/igvf-kallisto-bustools/run_kallisto.py b/modules/igvf-kallisto-bustools/run_kallisto.py index 0b79f0d..abaf747 100644 --- a/modules/igvf-kallisto-bustools/run_kallisto.py +++ b/modules/igvf-kallisto-bustools/run_kallisto.py @@ -3,7 +3,9 @@ import h5py import logging import numpy as np +import os import shutil +import signal import sys import subprocess @@ -11,6 +13,38 @@ logging.basicConfig(stream=sys.stderr, level=logging.INFO) +def run_shell_cmd(cmd): + p = subprocess.Popen( + ['/bin/bash', '-o', 'pipefail'], # to catch error in pipe + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + preexec_fn=os.setsid) # to make a new process with a new PGID + pid = p.pid + pgid = os.getpgid(pid) + logging.info('run_shell_cmd: PID={}, PGID={}, CMD={}'.format(pid, pgid, cmd)) + stdout, stderr = p.communicate(cmd) + rc = p.returncode + err_str = ( + 'PID={pid}, PGID={pgid}, RC={rc}' + 'STDERR={stde}\nSTDOUT={stdo}' + ).format( + pid=pid, pgid=pgid, rc=rc, stde=stderr.strip(), stdo=stdout.strip() + ) + if rc: + # kill all child processes + try: + os.killpg(pgid, signal.SIGKILL) + except Exception: + pass + finally: + raise Exception(err_str) + else: + logging.info(err_str) + return stdout.strip('\n') + + def check_and_unzip(file_path): """ Checks if a file is gzipped and unzips it if necessary. @@ -100,6 +134,7 @@ def index_standard(output_dir, genome_fasta, gtf): logging.error(f"Archive command failed with error: {e.stderr}") + @index.command("nac") @click.option('--output_dir', type=click.Path(exists=True), help='Path to the output directory.', required=True) @click.option('--genome_fasta', type=click.Path(exists=True), help='Path to the genome fasta file.', required=True) @@ -127,6 +162,7 @@ def index_nac(output_dir, genome_fasta, gtf): logging.info(f"Command output: {result.stdout}") except subprocess.CalledProcessError as e: logging.error(f"Command failed with error: {e.stderr}") + # Archive the directory archive_cmd = f"tar -kzcvf {output_dir}.tar.gz {output_dir}" logging.info(f"Running archive command: {archive_cmd}") @@ -177,11 +213,7 @@ def quantify_standard(index_dir, read_format, output_dir, strand, subpool, threa interleaved_fastqs_str = " ".join(interleaved_fastqs) cmd = f"kb count -i {index_dir}/index.idx -g {index_dir}/t2g.txt -x {read_format} -w {barcode_onlist} --strand {strand} {replacement_list_param} -o {output_dir} --h5ad -t {threads} {interleaved_fastqs_str}" logging.info(f"Running command: {cmd}") - try: - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) - logging.info(f"Command output: {result.stdout}") - except subprocess.CalledProcessError as e: - logging.error(f"Command failed with error: {e.stderr}") + run_shell_cmd(cmd) # Append the subpool to the barcodes in the h5ad file if subpool: @@ -245,13 +277,11 @@ def quantify_nac(index_dir, read_format, output_dir, strand, subpool, threads, b """ logging.info("Running nac quantification pipeline.") # Create the command line string and run it using subprocess - cmd = f"kb count --workflow=nac -i {index_dir}/index.idx -g {index_dir}/t2g.txt -c1 {index_dir}/cdna.txt -c2 {index_dir}/nascent.txt --sum=total -x {read_format} -w {barcode_onlist} -r {replacement_list} --strand {strand} -o {output_dir} --h5ad -t {threads} {interleaved_fastqs}" + replacement_list_param = f"-r {replacement_list}" if replacement_list else "" + interleaved_fastqs_str = " ".join(interleaved_fastqs) + cmd = f"kb count --workflow=nac -i {index_dir}/index.idx -g {index_dir}/t2g.txt -c1 {index_dir}/cdna.txt -c2 {index_dir}/nascent.txt --sum=total -x {read_format} -w {barcode_onlist} {replacement_list_param} --strand {strand} -o {output_dir} --h5ad -t {threads} {interleaved_fastqs_str}" logging.info(f"Running command: {cmd}") - try: - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) - logging.info(f"Command output: {result.stdout}") - except subprocess.CalledProcessError as e: - logging.error(f"Command failed with error: {e.stderr}") + run_shell_cmd(cmd) # Append the subpool to the barcodes in the h5ad file if subpool: