Skip to content

Commit 290cfca

Browse files
committed
pre compile channelmaps and det statuses
1 parent 5191229 commit 290cfca

File tree

7 files changed

+119
-50
lines changed

7 files changed

+119
-50
lines changed

workflow/Snakefile

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ from datetime import datetime
1818
from collections import OrderedDict
1919
import logging
2020

21-
from dbetto import AttrsDict
21+
from dbetto import AttrsDict, TextDB
2222
from legendmeta import LegendMetadata
2323
from legenddataflow import CalGrouping, execenv, utils
2424
from legenddataflow.patterns import get_pattern_tier
25+
from legenddataflow.pre_compile_catalog import pre_compile_catalog
2526

2627
utils.subst_vars_in_snakemake_config(workflow, config)
2728
config = AttrsDict(config)
@@ -33,6 +34,9 @@ meta = utils.metadata_path(config)
3334
det_status = utils.det_status_path(config)
3435
basedir = workflow.basedir
3536

37+
det_status_textdb = pre_compile_catalog(Path(det_status) / "statuses")
38+
channelmap_textdb = pre_compile_catalog(Path(chan_maps) / "channelmaps")
39+
3640
time = datetime.now().strftime("%Y%m%dT%H%M%SZ")
3741

3842
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist

workflow/rules/chanlist_gen.smk

+33-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import random
55
import re
6+
from pathlib import Path
67

78
from legenddataflow.FileKey import ChannelProcKey
89
from legenddataflow.patterns import (
@@ -11,28 +12,42 @@ from legenddataflow.patterns import (
1112
)
1213
from legenddataflow import execenv_pyexe
1314
from legenddataflow.utils import filelist_path
15+
from dbetto import TextDB
16+
from dbetto.catalog import Catalog
1417

1518

16-
def get_chanlist(config, keypart, workflow, det_status, chan_maps, system):
19+
def get_chanlist(config, keypart, workflow, det_status, channelmap, system):
1720
key = ChannelProcKey.parse_keypart(keypart)
1821

19-
flist_path = filelist_path(config)
20-
os.makedirs(flist_path, exist_ok=True)
21-
output_file = os.path.join(
22-
flist_path,
23-
f"all-{key.experiment}-{key.period}-{key.run}-{key.datatype}-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
24-
)
25-
26-
os.system(
27-
execenv_pyexe(config, "create-chankeylist")
28-
+ f"--det-status {det_status} --channelmap {chan_maps} --timestamp {key.timestamp} "
29-
f"--datatype {key.datatype} --output-file {output_file} --system {system}"
30-
)
31-
32-
with open(output_file) as r:
33-
chan_list = r.read().splitlines()
34-
os.remove(output_file)
35-
return chan_list
22+
if isinstance(det_status, (str, Path)):
23+
det_status = TextDB(det_status, lazy=True)
24+
25+
if isinstance(channelmap, (str, Path)):
26+
channelmap = TextDB(channelmap, lazy=True)
27+
28+
if isinstance(det_status, TextDB):
29+
status_map = det_status.statuses.on(key.timestamp, system=key.datatype)
30+
else:
31+
status_map = det_status.valid_for(key.timestamp, system=key.datatype)
32+
if isinstance(channelmap, TextDB):
33+
chmap = channelmap.channelmaps.on(key.timestamp)
34+
else:
35+
chmap = channelmap.valid_for(key.timestamp)
36+
37+
# only restrict to a certain system (geds, spms, ...)
38+
channels = []
39+
for channel in chmap.map("system", unique=False)[system].map("name"):
40+
if channel not in status_map:
41+
msg = f"{channel} is not found in the status map (on {key.timestamp})"
42+
raise RuntimeError(msg)
43+
if status_map[channel].processable is False:
44+
continue
45+
channels.append(channel)
46+
47+
if len(channels) == 0:
48+
print("WARNING: No channels found") # noqa: T201
49+
50+
return channels
3651

3752

3853
def get_par_chanlist(

workflow/rules/channel_merge.smk

+8-8
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None, system="geds"):
1515
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
1616
tier,
1717
basedir,
18-
det_status,
19-
chan_maps,
18+
det_status_textdb,
19+
channelmap_textdb,
2020
system=system,
2121
),
2222
output:
@@ -37,8 +37,8 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None, system="geds"):
3737
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
3838
tier,
3939
basedir,
40-
det_status,
41-
chan_maps,
40+
det_status_textdb,
41+
channelmap_textdb,
4242
system=system,
4343
name="objects",
4444
extension="pkl",
@@ -68,8 +68,8 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None, system="geds"):
6868
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
6969
tier,
7070
basedir,
71-
det_status,
72-
chan_maps,
71+
det_status_textdb,
72+
channelmap_textdb,
7373
system=system,
7474
),
7575
output:
@@ -97,8 +97,8 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None, system="geds"):
9797
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-cal-{wildcards.timestamp}-channels",
9898
lh5_tier,
9999
basedir,
100-
det_status,
101-
chan_maps,
100+
det_status_textdb,
101+
channelmap_textdb,
102102
system=system,
103103
extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default,
104104
),

workflow/rules/common.smk

+4-1
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,8 @@ def get_search_pattern(tier):
122122

123123

124124
def get_table_name(metadata, config, datatype, timestamp, detector, tier):
125-
chmap = metadata.channelmap(timestamp, system=datatype)
125+
if isinstance(metadata, str):
126+
chmap = metadata.channelmap(timestamp, system=datatype)
127+
elif isinstance(metadata, Catalog):
128+
chmap = metadata.valid_for(timestamp, system=datatype)
126129
return config.table_format[tier].format(ch=chmap[detector].daq.rawid)

workflow/rules/dsp_pars_geds.smk

+41-16
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ rule build_pars_dsp_tau_geds:
2727
datatype="cal",
2828
channel="{channel}",
2929
raw_table_name=lambda wildcards: get_table_name(
30-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
30+
channelmap_textdb,
31+
config,
32+
"cal",
33+
wildcards.timestamp,
34+
wildcards.channel,
35+
"raw",
3136
),
3237
output:
3338
decay_const=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")),
@@ -57,14 +62,19 @@ rule build_pars_evtsel_geds:
5762
filelist_path(config), "all-{experiment}-{period}-{run}-cal-raw.filelist"
5863
),
5964
pulser_file=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"),
60-
database=get_pattern_pars_tmp_channel(config, "dsp", "decay_constant"),
65+
database=rules.build_pars_dsp_tau_geds.output.decay_const,
6166
raw_cal_curve=get_blinding_curve_file,
6267
params:
6368
timestamp="{timestamp}",
6469
datatype="cal",
6570
channel="{channel}",
6671
raw_table_name=lambda wildcards: get_table_name(
67-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
72+
channelmap_textdb,
73+
config,
74+
"cal",
75+
wildcards.timestamp,
76+
wildcards.channel,
77+
"raw",
6878
),
6979
output:
7080
peak_file=temp(
@@ -97,14 +107,19 @@ rule build_pars_dsp_nopt_geds:
97107
files=os.path.join(
98108
filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist"
99109
),
100-
database=get_pattern_pars_tmp_channel(config, "dsp", "decay_constant"),
101-
inplots=get_pattern_plts_tmp_channel(config, "dsp", "decay_constant"),
110+
database=rules.build_pars_dsp_tau_geds.output.decay_const,
111+
inplots=rules.build_pars_dsp_tau_geds.output.plots,
102112
params:
103113
timestamp="{timestamp}",
104114
datatype="cal",
105115
channel="{channel}",
106116
raw_table_name=lambda wildcards: get_table_name(
107-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
117+
channelmap_textdb,
118+
config,
119+
"cal",
120+
wildcards.timestamp,
121+
wildcards.channel,
122+
"raw",
108123
),
109124
output:
110125
dsp_pars_nopt=temp(
@@ -137,15 +152,20 @@ rule build_pars_dsp_dplms_geds:
137152
fft_files=os.path.join(
138153
filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist"
139154
),
140-
peak_file=get_pattern_pars_tmp_channel(config, "dsp", "peaks", extension="lh5"),
141-
database=get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization"),
142-
inplots=get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization"),
155+
peak_file=rules.build_pars_evtsel_geds.output.peak_file,
156+
database=rules.build_pars_dsp_nopt_geds.output.dsp_pars_nopt,
157+
inplots=rules.build_pars_dsp_nopt_geds.output.plots,
143158
params:
144159
timestamp="{timestamp}",
145160
datatype="cal",
146161
channel="{channel}",
147162
raw_table_name=lambda wildcards: get_table_name(
148-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
163+
channelmap_textdb,
164+
config,
165+
"cal",
166+
wildcards.timestamp,
167+
wildcards.channel,
168+
"raw",
149169
),
150170
output:
151171
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")),
@@ -176,15 +196,20 @@ rule build_pars_dsp_dplms_geds:
176196
# This rule builds the optimal energy filter parameters for the dsp using calibration dsp files
177197
rule build_pars_dsp_eopt_geds:
178198
input:
179-
peak_file=get_pattern_pars_tmp_channel(config, "dsp", "peaks", extension="lh5"),
180-
decay_const=get_pattern_pars_tmp_channel(config, "dsp", "dplms"),
181-
inplots=get_pattern_plts_tmp_channel(config, "dsp", "dplms"),
199+
peak_file=rules.build_pars_evtsel_geds.output.peak_file,
200+
decay_const=rules.build_pars_dsp_dplms_geds.output.dsp_pars,
201+
inplots=rules.build_pars_dsp_dplms_geds.output.plots,
182202
params:
183203
timestamp="{timestamp}",
184204
datatype="cal",
185205
channel="{channel}",
186206
raw_table_name=lambda wildcards: get_table_name(
187-
metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw"
207+
channelmap_textdb,
208+
config,
209+
"cal",
210+
wildcards.timestamp,
211+
wildcards.channel,
212+
"raw",
188213
),
189214
output:
190215
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")),
@@ -246,8 +271,8 @@ rule build_svm_dsp_geds:
246271

247272
rule build_pars_dsp_svm_geds:
248273
input:
249-
dsp_pars=get_pattern_pars_tmp_channel(config, "dsp_eopt"),
250-
svm_file=get_pattern_pars(config, "dsp", "svm", extension="pkl"),
274+
dsp_pars=rules.build_pars_dsp_eopt_geds.output.dsp_pars,
275+
svm_file=rules.build_svm_dsp_geds.output.dsp_pars,
251276
output:
252277
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp")),
253278
log:

workflow/rules/dsp_pars_spms.smk

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ rule build_pars_dsp_tau_spms:
1717
config,
1818
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
1919
workflow,
20-
det_status,
21-
chan_maps,
20+
det_status_textdb,
21+
channelmap_textdb,
2222
system="spms",
2323
),
2424
raw_table_names=lambda wildcards: [
2525
get_table_name(
26-
metadata,
26+
channelmap_textdb,
2727
config,
2828
wildcards.datatype,
2929
wildcards.timestamp,
@@ -34,8 +34,8 @@ rule build_pars_dsp_tau_spms:
3434
config,
3535
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
3636
workflow,
37-
det_status,
38-
chan_maps,
37+
det_status_textdb,
38+
channelmap_textdb,
3939
system="spms",
4040
)
4141
],
@@ -44,7 +44,7 @@ rule build_pars_dsp_tau_spms:
4444
output:
4545
patt.get_pattern_pars(config, "dsp", name="spms", datatype="{datatype}"),
4646
log:
47-
patt.get_pattern_log(config, "pars_spms", time, datatype="{datatype}"),
47+
patt.get_pattern_log(config, "pars_spms", time),
4848
group:
4949
"par-dsp"
5050
shell:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from datetime import datetime, timezone
2+
from pathlib import Path
3+
4+
from dbetto import TextDB
5+
from dbetto.catalog import Catalog
6+
7+
8+
def pre_compile_catalog(validity_path: str | Path):
9+
if isinstance(validity_path, str):
10+
validity_path = Path(validity_path)
11+
catalog = Catalog.read_from(validity_path / "validity.yaml")
12+
entries = {}
13+
textdb = TextDB(validity_path, lazy=False)
14+
for system in catalog.entries:
15+
entries[system] = []
16+
for entry in catalog.entries[system]:
17+
db = textdb.on(
18+
datetime.fromtimestamp(entry.valid_from, tz=timezone.utc), system=system
19+
)
20+
new_entry = Catalog.Entry(entry.valid_from, db)
21+
entries[system].append(new_entry)
22+
return Catalog(entries)

0 commit comments

Comments
 (0)