Skip to content

Commit 96cab8f

Browse files
authored
Merge pull request #110 from legend-exp/dev
Some improvements
2 parents 0249ec2 + ce22864 commit 96cab8f

File tree

4 files changed

+50
-53
lines changed

4 files changed

+50
-53
lines changed

workflow/rules/filelist_gen.smk

+8-10
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ def build_filelist(
163163
ignore_keys = []
164164
if analysis_runs is None:
165165
analysis_runs = {}
166+
166167
phy_filenames = []
167168
other_filenames = []
169+
168170
for key in filekeys:
169171
if Path(search_pattern).suffix == ".*":
170172
search_pattern = Path(search_pattern).with_suffix(".{ext}")
@@ -198,19 +200,15 @@ def build_filelist(
198200
else:
199201
other_filenames += filename
200202
else:
203+
# check if period in analysis_runs dicts
204+
# check if run in analysis_runs dicts
205+
# or if runs is just specified as "all"
201206
if (
202207
_key.datatype in analysis_runs
203-
and _key.period
204-
in analysis_runs[
205-
_key.datatype
206-
] # check if period in analysis_runs dicts
208+
and _key.period in analysis_runs[_key.datatype]
207209
and (
208-
_key.run
209-
in analysis_runs[_key.period][
210-
_key.datatype
211-
] # check if run in analysis_runs dicts
212-
or analysis_runs[_key.period][_key.datatype]
213-
== "all" # or if runs is just specified as "all"
210+
_key.run in analysis_runs[_key.datatype][_key.period]
211+
or analysis_runs[_key.datatype][_key.period] == "all"
214212
)
215213
):
216214
if _key.datatype in concat_datatypes:

workflow/src/legenddataflow/log.py

+14-20
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,36 @@
55
from dbetto import Props
66

77

8-
def build_log(config_dict: dict, log_file: str | None = None) -> logging.Logger:
9-
"""
10-
Build a logger from a configuration dictionary.
8+
def build_log(
9+
config_dict: dict, log_file: str | None = None, fallback: str = "prod"
10+
) -> logging.Logger:
11+
"""Build a logger from a configuration dictionary.
12+
1113
If a log file is provided, the logger will write to that file.
1214
1315
Parameters
1416
----------
15-
16-
config_dict : dict
17+
config_dict
1718
A dictionary containing the logging configuration.
18-
log_file : str, optional
19+
log_file
1920
The path to the log file.
20-
21-
Returns
22-
-------
23-
Logger
24-
The logger object.
25-
2621
"""
2722
if "logging" in config_dict["options"]:
2823
log_config = config_dict["options"]["logging"]
2924
log_config = Props.read_from(log_config)
25+
3026
if log_file is not None:
3127
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
32-
log_config["handlers"]["dynamic"] = {
33-
"class": "logging.FileHandler",
34-
"level": "DEBUG",
35-
"formatter": "simple",
36-
"filename": log_file,
37-
"mode": "a",
38-
}
28+
log_config["handlers"]["dataflow"]["filename"] = log_file
29+
3930
dictConfig(log_config)
4031
log = logging.getLogger(config_dict["options"].get("logger", "prod"))
32+
4133
else:
4234
if log_file is not None:
4335
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
4436
logging.basicConfig(level=logging.INFO, filename=log_file, filemode="w")
45-
log = logging.getLogger(__name__)
37+
38+
log = logging.getLogger(fallback)
39+
4640
return log

workflow/src/legenddataflow/scripts/tier/dsp.py

+18-21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import argparse
22
import re
3-
import time
43
from pathlib import Path
54

65
import numpy as np
@@ -57,7 +56,7 @@ def build_tier_dsp() -> None:
5756
msg = f"tier {args.tier} not supported"
5857
raise ValueError(msg)
5958

60-
log = build_log(config_dict, args.log)
59+
log = build_log(config_dict, args.log, fallback=__name__)
6160

6261
settings_dict = config_dict.options.get("settings", {})
6362
if isinstance(settings_dict, str):
@@ -77,14 +76,21 @@ def build_tier_dsp() -> None:
7776
}
7877

7978
# now construct the dictionary of DSP configs for build_dsp()
80-
channel_dict = {}
79+
dsp_cfg_tbl_dict = {}
8180
for chan, file in chan_cfg_map.items():
8281
if chan_map[chan].analysis.processable is False:
8382
msg = f"channel {chan} is set to non-processable in the channel map"
8483
raise RuntimeError(msg)
8584

8685
tbl = "dsp" if args.tier in ["ann", "pan"] else "raw"
87-
channel_dict[f"ch{chan_map[chan].daq.rawid:07}/{tbl}"] = Props.read_from(file)
86+
input_tbl_name = f"ch{chan_map[chan].daq.rawid:07}/{tbl}"
87+
88+
# check if the raw tables are all existing
89+
if len(lh5.ls(args.input, input_tbl_name)) > 0:
90+
dsp_cfg_tbl_dict[input_tbl_name] = Props.read_from(file)
91+
else:
92+
msg = f"table {input_tbl_name} not found in {args.input} skipping"
93+
log.warning(msg)
8894

8995
# par files
9096
db_files = [
@@ -93,36 +99,27 @@ def build_tier_dsp() -> None:
9399
if Path(par_file).suffix in (".json", ".yaml", ".yml")
94100
]
95101

96-
database_dic = _replace_list_with_array(
102+
database_dict = _replace_list_with_array(
97103
Props.read_from(db_files, subst_pathvar=True)
98104
)
99-
database_dic = {
105+
database_dict = {
100106
(f"ch{chan_map[chan].daq.rawid:07}" if chan in chan_map else chan): dic
101-
for chan, dic in database_dic.items()
107+
for chan, dic in database_dict.items()
102108
}
103109

104110
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
105111

106-
rng = np.random.default_rng()
107-
rand_num = f"{rng.integers(0, 99999):05d}"
108-
temp_output = f"{args.output}.{rand_num}"
109-
110-
start = time.time()
111-
112112
build_dsp(
113113
args.input,
114-
temp_output,
114+
args.output,
115115
{},
116-
database=database_dic,
117-
chan_config=channel_dict,
116+
database=database_dict,
117+
chan_config=dsp_cfg_tbl_dict,
118118
write_mode="r",
119119
buffer_len=settings_dict.get("buffer_len", 1000),
120120
block_width=settings_dict.get("block_width", 16),
121121
)
122122

123-
log.info(f"build_dsp finished in {time.time()-start}")
124-
Path(temp_output).rename(args.output)
125-
126123
key = Path(args.output).name.replace(f"-tier_{args.tier}.lh5", "")
127124

128125
if args.tier in ["dsp", "psp"]:
@@ -136,7 +133,7 @@ def build_tier_dsp() -> None:
136133

137134
outputs = {}
138135
channels = []
139-
for channel, chan_dict in channel_dict.items():
136+
for channel, chan_dict in dsp_cfg_tbl_dict.items():
140137
output = chan_dict["outputs"]
141138
in_dict = False
142139
for entry in outputs:
@@ -162,7 +159,7 @@ def build_tier_dsp() -> None:
162159
else:
163160
outputs = {}
164161
channels = []
165-
for channel, chan_dict in channel_dict.items():
162+
for channel, chan_dict in dsp_cfg_tbl_dict.items():
166163
output = chan_dict["outputs"]
167164
in_dict = False
168165
for entry in outputs:

workflow/src/legenddataflow/scripts/tier/hit.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from dbetto.catalog import Props
55
from legendmeta import LegendMetadata, TextDB
6+
from lgdo import lh5
67
from pygama.hit.build_hit import build_hit
78

89
from ...log import build_log
@@ -34,7 +35,7 @@ def build_tier_hit() -> None:
3435
.on(args.timestamp, system=args.datatype)
3536
.snakemake_rules.tier_hit
3637
)
37-
log = build_log(df_config, args.log)
38+
log = build_log(df_config, args.log, fallback=__name__)
3839
log.info("initializing")
3940

4041
settings_dict = df_config.options.get("settings", {})
@@ -71,7 +72,14 @@ def build_tier_hit() -> None:
7172
# get pars (to override hit config)
7273
Props.add_to(hit_cfg, pars_dict.get(chan, {}).copy())
7374

74-
channel_dict[f"ch{chan_map[chan].daq.rawid}/dsp"] = hit_cfg
75+
input_tbl_name = f"ch{chan_map[chan].daq.rawid}/dsp"
76+
77+
# check if the raw tables are all existing
78+
if len(lh5.ls(args.input, input_tbl_name)) > 0:
79+
channel_dict[input_tbl_name] = hit_cfg
80+
else:
81+
msg = f"table {input_tbl_name} not found in {args.input} skipping"
82+
log.warning(msg)
7583

7684
log.info("running build_hit()...")
7785
Path(args.output).parent.mkdir(parents=True, exist_ok=True)

0 commit comments

Comments
 (0)