Skip to content

Commit 1528085

Browse files
authored
Merge pull request #117 from legend-exp/spm_speedup
Spm speedup
2 parents b15a0b3 + 3e5f23d commit 1528085

14 files changed

+338
-159
lines changed

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds
118118
par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck"
119119
par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser"
120120
par-spms-dsp-trg-thr = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr"
121+
par-spms-dsp-trg-thr-multi = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr_multi"
121122

122123
[tool.uv.workspace]
123124
exclude = ["generated", "inputs", "software", "workflow"]

workflow/Snakefile

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ from datetime import datetime
1818
from collections import OrderedDict
1919
import logging
2020

21-
from dbetto import AttrsDict
21+
from dbetto import AttrsDict, TextDB
2222
from legendmeta import LegendMetadata
2323
from legenddataflow import CalGrouping, execenv, utils
2424
from legenddataflow.patterns import get_pattern_tier
25+
from legenddataflow.pre_compile_catalog import pre_compile_catalog
2526

2627
utils.subst_vars_in_snakemake_config(workflow, config)
2728
config = AttrsDict(config)
@@ -33,6 +34,9 @@ meta = utils.metadata_path(config)
3334
det_status = utils.det_status_path(config)
3435
basedir = workflow.basedir
3536

37+
det_status_textdb = pre_compile_catalog(Path(det_status) / "statuses")
38+
channelmap_textdb = pre_compile_catalog(Path(chan_maps) / "channelmaps")
39+
3640
time = datetime.now().strftime("%Y%m%dT%H%M%SZ")
3741

3842
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist

workflow/rules/chanlist_gen.smk

+44-25
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import random
55
import re
6+
from pathlib import Path
67

78
from legenddataflow.FileKey import ChannelProcKey
89
from legenddataflow.patterns import (
@@ -11,31 +12,42 @@ from legenddataflow.patterns import (
1112
)
1213
from legenddataflow import execenv_pyexe
1314
from legenddataflow.utils import filelist_path
15+
from dbetto import TextDB
16+
from dbetto.catalog import Catalog
1417

1518

16-
# FIXME: the system argument should always be explicitly supplied
17-
def get_chanlist(
18-
setup, keypart, workflow, config, det_status, chan_maps, system="geds"
19-
):
19+
def get_chanlist(config, keypart, workflow, det_status, channelmap, system):
2020
key = ChannelProcKey.parse_keypart(keypart)
2121

22-
flist_path = filelist_path(setup)
23-
os.makedirs(flist_path, exist_ok=True)
24-
output_file = os.path.join(
25-
flist_path,
26-
f"all-{key.experiment}-{key.period}-{key.run}-{key.datatype}-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
27-
)
22+
if isinstance(det_status, (str, Path)):
23+
det_status = TextDB(det_status, lazy=True)
2824

29-
os.system(
30-
execenv_pyexe(config, "create-chankeylist")
31-
+ f"--det-status {det_status} --channelmap {chan_maps} --timestamp {key.timestamp} "
32-
f"--datatype {key.datatype} --output-file {output_file} --system {system}"
33-
)
25+
if isinstance(channelmap, (str, Path)):
26+
channelmap = TextDB(channelmap, lazy=True)
27+
28+
if isinstance(det_status, TextDB):
29+
status_map = det_status.statuses.on(key.timestamp, system=key.datatype)
30+
else:
31+
status_map = det_status.valid_for(key.timestamp, system=key.datatype)
32+
if isinstance(channelmap, TextDB):
33+
chmap = channelmap.channelmaps.on(key.timestamp)
34+
else:
35+
chmap = channelmap.valid_for(key.timestamp)
3436

35-
with open(output_file) as r:
36-
chan_list = r.read().splitlines()
37-
os.remove(output_file)
38-
return chan_list
37+
# only restrict to a certain system (geds, spms, ...)
38+
channels = []
39+
for channel in chmap.map("system", unique=False)[system].map("name"):
40+
if channel not in status_map:
41+
msg = f"{channel} is not found in the status map (on {key.timestamp})"
42+
raise RuntimeError(msg)
43+
if status_map[channel].processable is False:
44+
continue
45+
channels.append(channel)
46+
47+
if len(channels) == 0:
48+
print("WARNING: No channels found") # noqa: T201
49+
50+
return channels
3951

4052

4153
def get_par_chanlist(
@@ -45,15 +57,13 @@ def get_par_chanlist(
4557
basedir,
4658
det_status,
4759
chan_maps,
60+
system,
4861
datatype="cal",
49-
system="geds",
5062
name=None,
5163
extension="yaml",
5264
):
5365

54-
chan_list = get_chanlist(
55-
setup, keypart, workflow, config, det_status, chan_maps, system
56-
)
66+
chan_list = get_chanlist(setup, keypart, workflow, det_status, chan_maps, system)
5767

5868
par_pattern = get_pattern_pars_tmp_channel(
5969
setup, tier, name, datatype=datatype, extension=extension
@@ -64,9 +74,18 @@ def get_par_chanlist(
6474
return filenames
6575

6676

67-
def get_plt_chanlist(setup, keypart, tier, basedir, det_status, chan_maps, name=None):
77+
def get_plt_chanlist(
78+
setup,
79+
keypart,
80+
tier,
81+
basedir,
82+
det_status,
83+
chan_maps,
84+
system,
85+
name=None,
86+
):
6887

69-
chan_list = get_chanlist(setup, keypart, workflow, config, det_status, chan_maps)
88+
chan_list = get_chanlist(setup, keypart, workflow, det_status, chan_maps, system)
7089

7190
par_pattern = get_pattern_plts_tmp_channel(setup, tier, name)
7291

workflow/rules/channel_merge.smk

+13-37
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ from legenddataflow.utils import set_last_rule_name
55
from legenddataflow.execenv import execenv_pyexe
66

77

8-
def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
8+
def build_merge_rules(tier, lh5_merge=False, lh5_tier=None, system="geds"):
99
if lh5_tier is None:
1010
lh5_tier = tier
1111
rule:
@@ -15,8 +15,9 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
1515
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
1616
tier,
1717
basedir,
18-
det_status,
19-
chan_maps,
18+
det_status_textdb,
19+
channelmap_textdb,
20+
system=system,
2021
),
2122
output:
2223
patterns.get_pattern_plts(config, tier),
@@ -36,8 +37,9 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
3637
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
3738
tier,
3839
basedir,
39-
det_status,
40-
chan_maps,
40+
det_status_textdb,
41+
channelmap_textdb,
42+
system=system,
4143
name="objects",
4244
extension="pkl",
4345
),
@@ -66,8 +68,9 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
6668
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
6769
tier,
6870
basedir,
69-
det_status,
70-
chan_maps,
71+
det_status_textdb,
72+
channelmap_textdb,
73+
system=system,
7174
),
7275
output:
7376
temp(
@@ -86,34 +89,6 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
8689

8790
set_last_rule_name(workflow, f"build_pars_{tier}_db")
8891

89-
rule:
90-
"""Merge pars for SiPM channels in a single pars file."""
91-
input:
92-
lambda wildcards: get_par_chanlist(
93-
config,
94-
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
95-
tier,
96-
basedir,
97-
det_status,
98-
chan_maps,
99-
datatype=wildcards.datatype,
100-
system="spms"
101-
),
102-
output:
103-
patterns.get_pattern_pars(
104-
config,
105-
tier,
106-
name="spms",
107-
datatype="{datatype}",
108-
),
109-
group:
110-
f"merge-{tier}"
111-
shell:
112-
execenv_pyexe(config, "merge-channels") + \
113-
"--input {input} "
114-
"--output {output} "
115-
116-
set_last_rule_name(workflow, f"build_pars_spms_{tier}_db")
11792

11893
rule:
11994
input:
@@ -122,8 +97,9 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
12297
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
12398
lh5_tier,
12499
basedir,
125-
det_status,
126-
chan_maps,
100+
det_status_textdb,
101+
channelmap_textdb,
102+
system=system,
127103
extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default,
128104
),
129105
in_db=patterns.get_pattern_pars_tmp(

workflow/rules/common.smk

+8-1
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,12 @@ def get_search_pattern(tier):
122122

123123

124124
def get_table_name(metadata, config, datatype, timestamp, detector, tier):
125-
chmap = metadata.channelmap(timestamp, system=datatype)
125+
if isinstance(metadata, (str, Path)):
126+
chmap = metadata.channelmap(timestamp, system=datatype)
127+
elif isinstance(metadata, Catalog):
128+
chmap = metadata.valid_for(timestamp, system=datatype)
129+
else:
130+
raise ValueError(
131+
f"metadata must be a string or a Catalog object, not {type(metadata)}"
132+
)
126133
return config.table_format[tier].format(ch=chmap[detector].daq.rawid)

workflow/rules/dsp_pars_geds.smk

+41-16
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ rule build_pars_dsp_tau_geds:
2727
datatype="cal",
2828
channel="{channel}",
2929
raw_table_name=lambda wildcards: get_table_name(
30-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
30+
channelmap_textdb,
31+
config,
32+
"cal",
33+
wildcards.timestamp,
34+
wildcards.channel,
35+
"raw",
3136
),
3237
output:
3338
decay_const=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")),
@@ -57,14 +62,19 @@ rule build_pars_evtsel_geds:
5762
filelist_path(config), "all-{experiment}-{period}-{run}-cal-raw.filelist"
5863
),
5964
pulser_file=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"),
60-
database=get_pattern_pars_tmp_channel(config, "dsp", "decay_constant"),
65+
database=rules.build_pars_dsp_tau_geds.output.decay_const,
6166
raw_cal_curve=get_blinding_curve_file,
6267
params:
6368
timestamp="{timestamp}",
6469
datatype="cal",
6570
channel="{channel}",
6671
raw_table_name=lambda wildcards: get_table_name(
67-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
72+
channelmap_textdb,
73+
config,
74+
"cal",
75+
wildcards.timestamp,
76+
wildcards.channel,
77+
"raw",
6878
),
6979
output:
7080
peak_file=temp(
@@ -97,14 +107,19 @@ rule build_pars_dsp_nopt_geds:
97107
files=os.path.join(
98108
filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist"
99109
),
100-
database=get_pattern_pars_tmp_channel(config, "dsp", "decay_constant"),
101-
inplots=get_pattern_plts_tmp_channel(config, "dsp", "decay_constant"),
110+
database=rules.build_pars_dsp_tau_geds.output.decay_const,
111+
inplots=rules.build_pars_dsp_tau_geds.output.plots,
102112
params:
103113
timestamp="{timestamp}",
104114
datatype="cal",
105115
channel="{channel}",
106116
raw_table_name=lambda wildcards: get_table_name(
107-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
117+
channelmap_textdb,
118+
config,
119+
"cal",
120+
wildcards.timestamp,
121+
wildcards.channel,
122+
"raw",
108123
),
109124
output:
110125
dsp_pars_nopt=temp(
@@ -137,15 +152,20 @@ rule build_pars_dsp_dplms_geds:
137152
fft_files=os.path.join(
138153
filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist"
139154
),
140-
peak_file=get_pattern_pars_tmp_channel(config, "dsp", "peaks", extension="lh5"),
141-
database=get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization"),
142-
inplots=get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization"),
155+
peak_file=rules.build_pars_evtsel_geds.output.peak_file,
156+
database=rules.build_pars_dsp_nopt_geds.output.dsp_pars_nopt,
157+
inplots=rules.build_pars_dsp_nopt_geds.output.plots,
143158
params:
144159
timestamp="{timestamp}",
145160
datatype="cal",
146161
channel="{channel}",
147162
raw_table_name=lambda wildcards: get_table_name(
148-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
163+
channelmap_textdb,
164+
config,
165+
"cal",
166+
wildcards.timestamp,
167+
wildcards.channel,
168+
"raw",
149169
),
150170
output:
151171
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")),
@@ -176,15 +196,20 @@ rule build_pars_dsp_dplms_geds:
176196
# This rule builds the optimal energy filter parameters for the dsp using calibration dsp files
177197
rule build_pars_dsp_eopt_geds:
178198
input:
179-
peak_file=get_pattern_pars_tmp_channel(config, "dsp", "peaks", extension="lh5"),
180-
decay_const=get_pattern_pars_tmp_channel(config, "dsp", "dplms"),
181-
inplots=get_pattern_plts_tmp_channel(config, "dsp", "dplms"),
199+
peak_file=rules.build_pars_evtsel_geds.output.peak_file,
200+
decay_const=rules.build_pars_dsp_dplms_geds.output.dsp_pars,
201+
inplots=rules.build_pars_dsp_dplms_geds.output.plots,
182202
params:
183203
timestamp="{timestamp}",
184204
datatype="cal",
185205
channel="{channel}",
186206
raw_table_name=lambda wildcards: get_table_name(
187-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
207+
channelmap_textdb,
208+
config,
209+
"cal",
210+
wildcards.timestamp,
211+
wildcards.channel,
212+
"raw",
188213
),
189214
output:
190215
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")),
@@ -246,8 +271,8 @@ rule build_svm_dsp_geds:
246271

247272
rule build_pars_dsp_svm_geds:
248273
input:
249-
dsp_pars=get_pattern_pars_tmp_channel(config, "dsp_eopt"),
250-
svm_file=get_pattern_pars(config, "dsp", "svm", extension="pkl"),
274+
dsp_pars=rules.build_pars_dsp_eopt_geds.output.dsp_pars,
275+
svm_file=rules.build_svm_dsp_geds.output.dsp_pars,
251276
output:
252277
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp")),
253278
log:

0 commit comments

Comments
 (0)