Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Existing output directory #660

Draft
wants to merge 21 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fb91f20
Tests to skip module (MCCD PSF) if output file exists
martinkilbinger Sep 5, 2023
ac061c0
Merge remote-tracking branch 'upstream/develop' into output_exists
martinkilbinger Sep 17, 2023
a93546e
n_smp for more jobs; N_EPOCH bugs; ngmix checking for existing output
martinkilbinger Sep 17, 2023
1ed596c
numbering scheme with re pattern: copied, not changed
martinkilbinger Sep 24, 2023
ba4aa19
numbering scheme with re pattern: copied, not changed
martinkilbinger Sep 24, 2023
776943c
removed galsim from job script message
martinkilbinger Sep 24, 2023
3216eb6
Fixed import typo
martinkilbinger Oct 16, 2023
eeadfbf
ngmix template
martinkilbinger Oct 18, 2023
ee90dc1
testing openmpi 5.0.0 on candide
martinkilbinger Oct 28, 2023
22c2f56
Testing MPI on candide; errors with process list
martinkilbinger Oct 31, 2023
27e9bbb
Updated MPI setting and candide job
martinkilbinger Oct 31, 2023
c380cd4
ngmix runner reset to develop
martinkilbinger Oct 31, 2023
13940f5
ngmix script reset to develop
martinkilbinger Oct 31, 2023
0bec318
mccd and pysap dependencies added back in to example
martinkilbinger Oct 31, 2023
691daf3
submit run added missing arg
martinkilbinger Oct 31, 2023
3712734
config mpi
martinkilbinger Oct 31, 2023
ddf4091
removed debug prints
martinkilbinger Nov 6, 2023
3fb5a25
mpi4py upgraded to 3.1.5
martinkilbinger Nov 6, 2023
6a76454
changed warning print output
martinkilbinger Nov 6, 2023
7ac5042
ngmix_runner checked out from develop
martinkilbinger Nov 6, 2023
bfcdb78
Added testing of mmap file existence
martinkilbinger Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
dependencies:
- python=3.9
- pip>=21.2.4
- numpy==1.21.6
- numpy==1.22
- astropy==5.0
- automake==1.16.2
- autoconf==2.69
Expand All @@ -17,11 +17,9 @@ dependencies:
- pandas==1.4.1
- pip:
- cs_util==0.0.5
- mccd==1.2.3
- modopt==1.6.0
- PyQt5==5.15.6
- pyqtgraph==0.12.4
- python-pysap==0.0.6
- reproject==0.8
- sip_tpv==1.1
- sf_tools==2.0.4
Expand Down
2 changes: 1 addition & 1 deletion example/cfis/config_tile_MiViSmVi.ini
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ OUTPUT_DIR = $SP_RUN/output
[JOB]

# Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial
SMP_BATCH_SIZE = 12
SMP_BATCH_SIZE = 1

# Timeout value (optional), default is None, i.e. no timeout limit applied
TIMEOUT = 96:00:00
Expand Down
7 changes: 4 additions & 3 deletions example/cfis/config_tile_Ng_template.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN_DATETIME = False
MODULE = ngmix_runner

# Parallel processing mode, SMP or MPI
MODE = SMP
MODE = MPI


## ShapePipe file handling options
Expand All @@ -44,10 +44,10 @@ OUTPUT_DIR = $SP_RUN/output
[JOB]

# Batch size of parallel processing (optional), default is 1, i.e. run all jobs in serial
SMP_BATCH_SIZE = 1
SMP_BATCH_SIZE = 8

# Timeout value (optional), default is None, i.e. no timeout limit applied
TIMEOUT = 96:00:00
TIMEOUT = 40:00:00


## Module options
Expand All @@ -62,6 +62,7 @@ FILE_PATTERN = sexcat, image_vignet, background_vignet, galaxy_psf, weight_vigne
FILE_EXT = .fits, .sqlite, .sqlite, .sqlite, .sqlite, .sqlite

# NUMBERING_SCHEME (optional) string with numbering pattern for input files
#NUMBERING_SCHEME = \d{0}-\d{3}-\d{2}9
NUMBERING_SCHEME = -000-000

# Multi-epoch mode: Path to file with single-exposure WCS header information
Expand Down
2 changes: 1 addition & 1 deletion example/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# MODULE (required) must be a valid module runner name (or a comma separated list of names)
MODULE = python_example_runner, serial_example_runner, execute_example_runner, python_example_runner, execute_example_runner
# MODE (optional) options are smp or mpi, default is smp
; MODE = mpi
MODE = mpi

## ShapePipe file handling options
[FILE]
Expand Down
Empty file removed example/output/.gitkeep
Empty file.
24 changes: 20 additions & 4 deletions example/pbs/candide_mpi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,30 @@ export SPENV="$HOME/.conda/envs/shapepipe"
export SPDIR="$HOME/shapepipe"

# Load modules
module load intelpython/3
module load openmpi/4.0.5
module load gcc/9.3.0
module load intelpython/3-2023.1.0
module load openmpi/5.0.0

# Activate conda environment
source activate $SPENV

# Run ShapePipe using full paths to executables
$SPENV/bin/mpiexec --map-by node $SPENV/bin/shapepipe_run -c $SPDIR/example/config_mpi.ini
# Other options to test
# -map-by

if [ -f "$PBS_NODEFILE" ]; then
NSLOTS=`cat $PBS_NODEFILE | wc -l`
echo "Using $NSLOTS CPUs from PBS_NODEFILE $PBS_NODEFILE"
else
NSLOTS=4
echo "Using $NSLOTS CPUs set by hand"
fi

# Creates #node output dirs
MPI_CMD=/softs/openmpi/5.0.0-torque-CentOS7/bin/mpirun
MPI_ARGS="-np $NSLOTS"

${MPI_CMD} ${MPI_ARGS} hostname
${MPI_CMD} ${MPI_ARGS} $SPENV/bin/shapepipe_run -c $SPDIR/example/pbs/config_mpi.ini

# Return exit code
exit 0
6 changes: 3 additions & 3 deletions example/pbs/config_mpi.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## ShapePipe execution options
[EXECUTION]
MODULE = python_example, serial_example, execute_example
MODULE = python_example_runner, serial_example_runner, execute_example_runner
MODE = mpi

## ShapePipe file handling options
Expand All @@ -15,8 +15,8 @@ OUTPUT_DIR = $SPDIR/example/output
TIMEOUT = 00:01:35

## Module options
[PYTHON_EXAMPLE]
[PYTHON_EXAMPLE_RUNNER]
MESSAGE = The obtained value is:

[SERIAL_EXAMPLE]
[SERIAL_EXAMPLE_RUNNER]
ADD_INPUT_DIR = $SPDIR/example/data/numbers, $SPDIR/example/data/letters
2 changes: 1 addition & 1 deletion install_shapepipe
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ last_update="08/03/22"
# Conda package versions
fftw_ver="3.3.10"
libpng_ver="1.6.37"
mpi4py_ver="3.1.3"
mpi4py_ver="3.1.5"
openblas_ver="0.3.18"

# SExtractor Package
Expand Down
6 changes: 3 additions & 3 deletions scripts/sh/job_sp.bash
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ if [[ $do_job != 0 ]]; then
### Star detection, selection, PSF model. setools can exit with an error for CCD with insufficient stars,
### the script should continue
STOP=0
command_sp "shapepipe_run -c $SP_CONFIG/config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)"
command_cfg_shapepipe "config_tile_Sx_exp_${psf}.ini" "Run shapepipe (tile detection, exp $psf)" $n_smp
STOP=1

fi
Expand All @@ -432,7 +432,7 @@ if [[ $do_job != 0 ]]; then
### PSF model letter: 'P' (psfex) or 'M' (mccd)
letter=${psf:0:1}
Letter=${letter^}
command_sp "shapepipe_run -c $SP_CONFIG/config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)"
command_cfg_shapepipe "config_tile_${Letter}iViSmVi.ini" "Run shapepipe (tile PsfInterp=$Letter}: up to ngmix+galsim)" $n_smp

fi

Expand Down Expand Up @@ -464,7 +464,7 @@ if [[ $do_job != 0 ]]; then
### Shapes, run $nsh_jobs parallel processes
VERBOSE=0
for k in $(seq 1 $nsh_jobs); do
command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix+galsim $k)" &
command_sp "shapepipe_run -c $SP_CONFIG_MOD/config_tile_Ng${k}u.ini" "Run shapepipe (tile: ngmix $k)" &
done
wait
VERBOSE=1
Expand Down
2 changes: 1 addition & 1 deletion shapepipe/modules/make_cat_package/make_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sqlitedict import SqliteDict

from shapepipe.pipeline import file_io
from shapepipe.utitities import galaxy
from shapepipe.utilities import galaxy


def prepare_final_cat_file(output_path, file_number_string):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def _interpolate_me(self):

all_id = np.copy(cat.get_data()['NUMBER'])
key_ne = 'N_EPOCH'
if key_ne not in cat.get_data():
if key_ne not in cat.get_data().dtype.names:
raise KeyError(
f'Key {key_ne} not found in input galaxy catalogue, needed for'
+ ' PSF interpolation to multi-epoch data; run previous module'
Expand Down
8 changes: 8 additions & 0 deletions shapepipe/modules/merge_sep_cats_package/merge_sep_cats.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def process(self):
list_col_name = cat0.get_col_names()
cat0.close()

# Check that exactly 6 (1 primary + secondary HDUs) exist
if len(list_ext_name) != 6:
msg = f"Number of HDUs is {len(list_ext_name)}, expected 6"
#raise ValueError(msg)
print(f"warning: {msg}; setting to 6")
list_ext_name = list_ext_name[:6]
list_col_name = list_col_name[:6]

# Create empty dictionary
# data dimension = n_extension x n_column x n_obj
data = {}
Expand Down
14 changes: 11 additions & 3 deletions shapepipe/pipeline/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
from functools import partial, reduce
from shutil import copyfile
import time

import numpy as np

Expand Down Expand Up @@ -93,7 +94,7 @@ def run_dir(self):
@run_dir.setter
def run_dir(self, value):

self._run_dir = self.check_dir(value, check_exists=True)
self._run_dir = self.check_dir(value, check_exists=False)

@property
def _input_dir(self):
Expand Down Expand Up @@ -188,7 +189,7 @@ def mkdir(cls, dir_name):
Directory name with full path

"""
cls.check_dir(dir_name, check_exists=True)
cls.check_dir(dir_name, check_exists=False)
mkdir(dir_name)

@staticmethod
Expand Down Expand Up @@ -1006,7 +1007,14 @@ def _save_match_patterns(output_file, mmap_list):
List of memory maps

"""
num_pattern_list = [np.load(mmap, mmap_mode='r') for mmap in mmap_list]
num_pattern_list = []
for mmap in mmap_list:
if not os.path.exists(mmap):
n_sec = 5
time.sleep(n_sec)
if not os.path.exists(mmap):
print("MKDEBUG still not found")
num_pattern_list.append(np.load(mmap, mmap_mode="r"))

np.save(
output_file,
Expand Down
2 changes: 2 additions & 0 deletions shapepipe/pipeline/mpi_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def submit_mpi_jobs(
timeout,
run_dirs,
module_runner,
module_config_sec,
worker_log,
verbose,
):
Expand All @@ -57,6 +58,7 @@ def submit_mpi_jobs(
w_log_name,
run_dirs,
config,
module_config_sec,
timeout,
module_runner
))
Expand Down
11 changes: 8 additions & 3 deletions shapepipe/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,16 @@ def run_mpi(pipe, comm):
# Get file handler objects
run_dirs = jh.filehd.module_run_dirs
module_runner = jh.filehd.module_runners[module]
module_config_sec = jh.filehd.get_module_config_sec(module)
worker_log = jh.filehd.get_worker_log_name
# Define process list
process_list = jh.filehd.process_list
# Define job list
jobs = split_mpi_jobs(process_list, comm.size)
del process_list
else:
job_type = module_runner = worker_log = timeout = \
jobs = run_dirs = None
job_type = module_runner = module_config_sec = worker_log = \
timeout = jobs = run_dirs = None

# Broadcast job type to all nodes
job_type = comm.bcast(job_type, root=0)
Expand All @@ -424,6 +425,7 @@ def run_mpi(pipe, comm):
run_dirs = comm.bcast(run_dirs, root=0)

module_runner = comm.bcast(module_runner, root=0)
module_config_sec = comm.bcast(module_config_sec, root=0)
worker_log = comm.bcast(worker_log, root=0)
timeout = comm.bcast(timeout, root=0)
jobs = comm.scatter(jobs, root=0)
Expand All @@ -436,14 +438,15 @@ def run_mpi(pipe, comm):
timeout,
run_dirs,
module_runner,
module_config_sec,
worker_log,
verbose
),
root=0,
)

# Delete broadcast objects
del module_runner, worker_log, timeout, jobs
del module_runner, module_config_sec, worker_log, timeout, jobs

# Finish up parallel jobs
if master:
Expand Down Expand Up @@ -474,12 +477,14 @@ def run(*args):

if import_mpi:
comm = MPI.COMM_WORLD
print(f"MKDEBUG comm = {comm}")
master = comm.rank == 0
else:
master = True

if master:
pipe = ShapePipe()
print("MKDEBUG set_up (-> mkdir out)")
pipe.set_up()
mode = pipe.mode
else:
Expand Down