Skip to content

Commit 5191229

Browse files
committed
single job for spms
1 parent 8cce71e commit 5191229

File tree

5 files changed

+143
-85
lines changed

5 files changed

+143
-85
lines changed

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds
118118
par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck"
119119
par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser"
120120
par-spms-dsp-trg-thr = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr"
121+
par-spms-dsp-trg-thr-multi = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr_multi"
121122

122123
[tool.uv.workspace]
123124
exclude = ["generated", "inputs", "software", "workflow"]

workflow/rules/chanlist_gen.smk

+15-11
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,10 @@ from legenddataflow import execenv_pyexe
1313
from legenddataflow.utils import filelist_path
1414

1515

16-
# FIXME: the system argument should always be explicitly supplied
17-
def get_chanlist(
18-
setup, keypart, workflow, config, det_status, chan_maps, system="geds"
19-
):
16+
def get_chanlist(config, keypart, workflow, det_status, chan_maps, system):
2017
key = ChannelProcKey.parse_keypart(keypart)
2118

22-
flist_path = filelist_path(setup)
19+
flist_path = filelist_path(config)
2320
os.makedirs(flist_path, exist_ok=True)
2421
output_file = os.path.join(
2522
flist_path,
@@ -45,15 +42,13 @@ def get_par_chanlist(
4542
basedir,
4643
det_status,
4744
chan_maps,
45+
system,
4846
datatype="cal",
49-
system="geds",
5047
name=None,
5148
extension="yaml",
5249
):
5350

54-
chan_list = get_chanlist(
55-
setup, keypart, workflow, config, det_status, chan_maps, system
56-
)
51+
chan_list = get_chanlist(setup, keypart, workflow, det_status, chan_maps, system)
5752

5853
par_pattern = get_pattern_pars_tmp_channel(
5954
setup, tier, name, datatype=datatype, extension=extension
@@ -64,9 +59,18 @@ def get_par_chanlist(
6459
return filenames
6560

6661

67-
def get_plt_chanlist(setup, keypart, tier, basedir, det_status, chan_maps, name=None):
62+
def get_plt_chanlist(
63+
setup,
64+
keypart,
65+
tier,
66+
basedir,
67+
det_status,
68+
chan_maps,
69+
system,
70+
name=None,
71+
):
6872

69-
chan_list = get_chanlist(setup, keypart, workflow, config, det_status, chan_maps)
73+
chan_list = get_chanlist(setup, keypart, workflow, det_status, chan_maps, system)
7074

7175
par_pattern = get_pattern_plts_tmp_channel(setup, tier, name)
7276

workflow/rules/channel_merge.smk

+5-29
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ from legenddataflow.utils import set_last_rule_name
55
from legenddataflow.execenv import execenv_pyexe
66

77

8-
def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
8+
def build_merge_rules(tier, lh5_merge=False, lh5_tier=None, system="geds"):
99
if lh5_tier is None:
1010
lh5_tier = tier
1111
rule:
@@ -17,6 +17,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
1717
basedir,
1818
det_status,
1919
chan_maps,
20+
system=system,
2021
),
2122
output:
2223
patterns.get_pattern_plts(config, tier),
@@ -38,6 +39,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
3839
basedir,
3940
det_status,
4041
chan_maps,
42+
system=system,
4143
name="objects",
4244
extension="pkl",
4345
),
@@ -68,6 +70,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
6870
basedir,
6971
det_status,
7072
chan_maps,
73+
system=system,
7174
),
7275
output:
7376
temp(
@@ -86,34 +89,6 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
8689

8790
set_last_rule_name(workflow, f"build_pars_{tier}_db")
8891

89-
rule:
90-
"""Merge pars for SiPM channels in a single pars file."""
91-
input:
92-
lambda wildcards: get_par_chanlist(
93-
config,
94-
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
95-
tier,
96-
basedir,
97-
det_status,
98-
chan_maps,
99-
datatype=wildcards.datatype,
100-
system="spms"
101-
),
102-
output:
103-
patterns.get_pattern_pars(
104-
config,
105-
tier,
106-
name="spms",
107-
datatype="{datatype}",
108-
),
109-
group:
110-
f"merge-{tier}"
111-
shell:
112-
execenv_pyexe(config, "merge-channels") + \
113-
"--input {input} "
114-
"--output {output} "
115-
116-
set_last_rule_name(workflow, f"build_pars_spms_{tier}_db")
11792

11893
rule:
11994
input:
@@ -124,6 +99,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
12499
basedir,
125100
det_status,
126101
chan_maps,
102+
system=system,
127103
extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default,
128104
),
129105
in_db=patterns.get_pattern_pars_tmp(

workflow/rules/dsp_pars_spms.smk

+29-12
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,47 @@ rule build_pars_dsp_tau_spms:
1313
params:
1414
timestamp="{timestamp}",
1515
datatype="{datatype}",
16-
channel="{channel}",
17-
raw_table_name=lambda wildcards: get_table_name(
18-
metadata,
16+
channels=lambda wildcards: get_chanlist(
1917
config,
20-
wildcards.datatype,
21-
wildcards.timestamp,
22-
wildcards.channel,
23-
"raw",
18+
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
19+
workflow,
20+
det_status,
21+
chan_maps,
22+
system="spms",
2423
),
24+
raw_table_names=lambda wildcards: [
25+
get_table_name(
26+
metadata,
27+
config,
28+
wildcards.datatype,
29+
wildcards.timestamp,
30+
channel,
31+
"raw",
32+
)
33+
for channel in get_chanlist(
34+
config,
35+
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
36+
workflow,
37+
det_status,
38+
chan_maps,
39+
system="spms",
40+
)
41+
],
2542
wildcard_constraints:
2643
datatype=r"\b(?!cal\b|xtc\b)\w+\b",
2744
output:
28-
temp(patt.get_pattern_pars_tmp_channel(config, "dsp", datatype="{datatype}")),
45+
patt.get_pattern_pars(config, "dsp", name="spms", datatype="{datatype}"),
2946
log:
30-
patt.get_pattern_log_channel(config, "pars_spms", time, datatype="{datatype}"),
47+
patt.get_pattern_log(config, "pars_spms", time, datatype="{datatype}"),
3148
group:
3249
"par-dsp"
3350
shell:
34-
execenv_pyexe(config, "par-spms-dsp-trg-thr") + "--config-path {configs} "
51+
execenv_pyexe(config, "par-spms-dsp-trg-thr-multi") + "--config-path {configs} "
3552
"--raw-file {input.raw_file} "
3653
"--dsp-db {input.pardb} "
3754
"--datatype {params.datatype} "
3855
"--timestamp {params.timestamp} "
39-
"--sipm-name {params.channel} "
40-
"--raw-table-name {params.raw_table_name} "
56+
"--sipm-names {params.channels} "
57+
"--raw-table-names {params.raw_table_names} "
4158
"--output-file {output} "
4259
"--logfile {log} "

workflow/src/legenddataflow/scripts/par/spms/dsp/trigger_threshold.py

+93-33
Original file line numberDiff line numberDiff line change
@@ -11,71 +11,48 @@
1111
from .....log import build_log
1212

1313

14-
def par_spms_dsp_trg_thr() -> None:
15-
# CLI interface
16-
argparser = argparse.ArgumentParser()
17-
argparser.add_argument("--raw-file", required=True)
18-
argparser.add_argument("--raw-table-name", required=True)
19-
argparser.add_argument("--output-file", required=True)
20-
argparser.add_argument("--config-path", required=True)
21-
argparser.add_argument("--datatype", required=True)
22-
argparser.add_argument("--timestamp", required=True)
23-
argparser.add_argument("--sipm-name", required=True)
24-
argparser.add_argument("--dsp-db", nargs="*", default=[])
25-
argparser.add_argument("--logfile")
26-
args = argparser.parse_args()
27-
28-
# dataflow configs
29-
df_configs = (
30-
TextDB(args.config_path, lazy=True)
31-
.on(args.timestamp, system=args.datatype)
32-
.snakemake_rules.pars_spms_dsp_trg_thr
33-
)
34-
35-
# setup logging
36-
log = build_log(df_configs, args.logfile)
37-
14+
def get_channel_trg_thr(df_configs, sipm_name, dsp_db, raw_file, raw_table_name, log):
3815
log.debug("reading in the configuration files")
3916
config = df_configs.inputs
4017
dsp_config = utils.load_dict(
41-
cfgtools.get_channel_config(config.processing_chain, args.sipm_name)
18+
cfgtools.get_channel_config(config.processing_chain, sipm_name)
4219
)
4320
settings = AttrsDict(
44-
utils.load_dict(cfgtools.get_channel_config(config.settings, args.sipm_name))
21+
utils.load_dict(cfgtools.get_channel_config(config.settings, sipm_name))
4522
)
4623

4724
# get DSP database from overrides
48-
_db_dict = Props.read_from(args.dsp_db).get(args.sipm_name, {})
25+
_db_dict = Props.read_from(dsp_db).get(sipm_name, {})
4926

5027
fwhm = None
5128

5229
# read raw file list
5330
log.debug("reading in the raw waveforms")
54-
if len(lh5.ls(args.raw_file, f"{args.raw_table_name}/waveform_bit_drop")) == 0:
31+
if len(lh5.ls(raw_file, f"{raw_table_name}/waveform_bit_drop")) == 0:
5532
msg = (
56-
f"could not find waveform '{args.raw_table_name}/waveform_bit_drop' "
33+
f"could not find waveform '{raw_table_name}/waveform_bit_drop' "
5734
"in {args.raw_file}, returning null pars"
5835
)
5936
log.warning(msg)
6037

6138
else:
6239
data = lh5.read(
63-
args.raw_table_name,
64-
args.raw_file,
40+
raw_table_name,
41+
raw_file,
6542
field_mask=["waveform_bit_drop"],
6643
n_rows=settings.n_events,
6744
)
6845

6946
if len(data) == 0:
7047
msg = (
71-
f"could not find any waveforms '{args.raw_table_name}/waveform_bit_drop' "
48+
f"could not find any waveforms '{raw_table_name}/waveform_bit_drop' "
7249
"in {args.raw_file}, returning null pars"
7350
)
7451
log.warning(msg)
7552

7653
elif len(data) < settings.n_events:
7754
msg = (
78-
f"number of waveforms '{args.raw_table_name}/waveform_bit_drop' < {settings.n_events}"
55+
f"number of waveforms '{raw_table_name}/waveform_bit_drop' < {settings.n_events}"
7956
"in {args.raw_file}, can't build histogram"
8057
)
8158
raise RuntimeError(msg)
@@ -112,9 +89,92 @@ def par_spms_dsp_trg_thr() -> None:
11289
msg = f"determined FWHM of baseline derivative distribution is so <= 0: {fwhm:.3f}"
11390
raise RuntimeError(msg)
11491

92+
return fwhm
93+
return None
94+
95+
96+
def par_spms_dsp_trg_thr() -> None:
97+
# CLI interface
98+
argparser = argparse.ArgumentParser()
99+
argparser.add_argument("--raw-file", required=True)
100+
argparser.add_argument("--raw-table-name", required=True)
101+
argparser.add_argument("--output-file", required=True)
102+
argparser.add_argument("--config-path", required=True)
103+
argparser.add_argument("--datatype", required=True)
104+
argparser.add_argument("--timestamp", required=True)
105+
argparser.add_argument("--sipm-name", required=True)
106+
argparser.add_argument("--dsp-db", nargs="*", default=[])
107+
argparser.add_argument("--logfile")
108+
args = argparser.parse_args()
109+
110+
# dataflow configs
111+
df_configs = (
112+
TextDB(args.config_path, lazy=True)
113+
.on(args.timestamp, system=args.datatype)
114+
.snakemake_rules.pars_spms_dsp_trg_thr
115+
)
116+
117+
# setup logging
118+
log = build_log(df_configs, args.logfile)
119+
120+
fwhm = get_channel_trg_thr(
121+
df_configs,
122+
args.sipm_name,
123+
args.dsp_db,
124+
args.raw_file,
125+
args.raw_table_name,
126+
log,
127+
)
128+
115129
log.debug(f"writing out baseline_curr_fwhm = {fwhm}")
116130
Path(args.output_file).parent.mkdir(parents=True, exist_ok=True)
117131
Props.write_to(
118132
args.output_file,
119133
{"baseline_curr_fwhm": float(fwhm) if fwhm is not None else fwhm},
120134
)
135+
136+
137+
def par_spms_dsp_trg_thr_multi() -> None:
138+
# CLI interface
139+
argparser = argparse.ArgumentParser()
140+
argparser.add_argument("--raw-file", required=True)
141+
argparser.add_argument("--raw-table-names", required=True, nargs="*")
142+
argparser.add_argument("--output-file", required=True)
143+
argparser.add_argument("--config-path", required=True)
144+
argparser.add_argument("--datatype", required=True)
145+
argparser.add_argument("--timestamp", required=True)
146+
argparser.add_argument("--sipm-names", required=True, nargs="*")
147+
argparser.add_argument("--dsp-db", nargs="*", default=[])
148+
argparser.add_argument("--logfile")
149+
args = argparser.parse_args()
150+
151+
# dataflow configs
152+
df_configs = (
153+
TextDB(args.config_path, lazy=True)
154+
.on(args.timestamp, system=args.datatype)
155+
.snakemake_rules.pars_spms_dsp_trg_thr
156+
)
157+
158+
# setup logging
159+
log = build_log(df_configs, args.logfile)
160+
161+
out_dict = {}
162+
for sipm_name, raw_table_name in zip(args.sipm_names, args.raw_table_names):
163+
fwhm = get_channel_trg_thr(
164+
df_configs,
165+
sipm_name,
166+
args.dsp_db,
167+
args.raw_file,
168+
raw_table_name,
169+
log,
170+
)
171+
172+
log.debug(f"baseline_curr_fwhm for {sipm_name} = {fwhm}")
173+
out_dict[sipm_name] = {
174+
"baseline_curr_fwhm": float(fwhm) if fwhm is not None else fwhm
175+
}
176+
Path(args.output_file).parent.mkdir(parents=True, exist_ok=True)
177+
Props.write_to(
178+
args.output_file,
179+
out_dict,
180+
)

0 commit comments

Comments
 (0)