-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFF_parser
executable file
·229 lines (197 loc) · 7.73 KB
/
FF_parser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python
import sys
import os
lib_path = '%s/libraries/' %os.path.dirname(__file__)
sys.path.append(lib_path)
import functions as func
import argparse as ap
import configparser
import pycbc
import numpy as np
import time
import Pegasus.DAX3 as dax
import pycbc.workflow as wf
import pycbc.workflow.pegasus_workflow as wdax
from pycbc.workflow import WorkflowConfigParser
import h5py
import math
def determine_values_from_parsing_config(config):
tau0_tolerance = float(config['Required']['tau0_tolerance'])
nsplits = int(config['Required']['nsplits'])
nworkflow = int(config['Required']['nworkflow'])
template_bank = config['Files']['template_bank']
inj_file = config['Files']['inj_file']
psd_file = config['Files']['psd_file']
return tau0_tolerance, nsplits, nworkflow, template_bank, inj_file, psd_file
class injs_and_tbsplits:
def __init__(self, inj, split_array):
self.inj = inj
self.split_array = split_array
def to_file(path, ifo=None):
""" Takes a str and returns a pycbc.workflow.pegasus_workflow.File
instance.
"""
fil = pycbc.workflow.core.resolve_url_to_file(path)
return fil
def load_template_indices(indices_filename):
hf = h5py.File(indices_filename, 'r')
indices = []
for inj_ind in hf.keys():
indices.append(hf[inj_ind][:])
return np.array(indices)
def save_indices_to_file(filename, indices):
with h5py.File(filename, 'w') as f:
for k in range(len(indices)):
key = '%s' %k
f.create_dataset(key, data=indices[k])
f.close()
def find_nearest(array,value):
idx = np.searchsorted(array, value, side="left")
if idx > 0 and (idx == len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])):
return idx-1
else:
return idx
def quick_check_inj_and_template_taus(tb_tau0, sg):
injs_outside = []
for k in range(len(sg)):
if sg[k].tau0 < tb_tau0.min() or sg[k].tau0 > tb_tau0.max():
print('Injection no. ', k, 'is outside the template bank coverage \n \t use injections within the region')
injs_outside.append(k)
print('\t Total ', len(injs_outside), 'injections outside the template bank')
return injs_outside
def find_template_indices(tb_tau0, tau0_tolerance, inj_file):
sg = func.read_injections(inj_file)
indices_save_dir = os.getcwd() + "/indices"
injs_outside = quick_check_inj_and_template_taus(tb_tau0, sg)
if (os.path.isdir(indices_save_dir) == False):
sys.exit(' Please make a "/indices in the workflow directory ')
else:
indices = []
for k in range(len(sg)):
if k in injs_outside:
indices.append([])
else:
low = sg[k].tau0 - tau0_tolerance
low_ind = find_nearest(tb_tau0, low)
high = sg[k].tau0 + tau0_tolerance
high_ind = find_nearest(tb_tau0, high)
indices.append(np.array(range(low_ind, high_ind+1)))
filename = indices_save_dir + "/template_indices.hdf"
save_indices_to_file(filename, indices)
return indices, filename
def check_for_unfinished_jobs(first, last, nsplits):
unfinished_jobs = []
for k in range(first, last):
split_array = []
for t in range(nsplits):
filename = "results/H1-FF_%s_%s-0-10.hdf" %(k, t)
if (os.path.isfile(filename) == False):
split_array.append(t)
if split_array != []:
temp_obj = injs_and_tbsplits(k, split_array)
unfinished_jobs.append(temp_obj)
return unfinished_jobs
# Command line parser
parser = ap.ArgumentParser()
parser.add_argument("--output_dir",
#default="output/",
help="Path to output directory.")
parser.add_argument('--first',
required = True, type = int,
help ='specify starting index of workflow loop (injection file index)')
parser.add_argument('--last',
required = True, type = int,
help ='specify ending index of workflow loop (injection file index)')
parser.add_argument('--cache_file',
help ='Cache file')
parser.add_argument('--submit_now')
parser.add_argument('--plan_now')
#parser.add_argument('--tau0_tolerance',
# required = True, type = float,
# help ='specify a constant tau0_tolerance to find the template indices near an injection')
#parser.add_argument('--f_min',
# required = True, type = float,
# help ='specify lower frequency for the analysis')
#parser.add_argument('--nsplits',
# required = True, type = int,
# help ='Number of jobs per injection (will split the local tb into these many parts)')
#parser.add_argument('--nworkflow',
# required = True, type = int,
# help ='Specify number of workflows')
pycbc.init_logging(True)
# add option groups
wf.add_workflow_command_line_group(parser)
# Parser command line
args, remaining_args = parser.parse_known_args()
config = configparser.ConfigParser()
config.read(args.config_files[0])
tau0_tolerance, nsplits, nworkflow, template_bank, inj_file, psd_file = determine_values_from_parsing_config(config)
first = args.first
last = args.last
print('\n \t \t Check before workflow is submitted \n \t Template bank - ', template_bank ,' \n \t Injection file - ', inj_file, ' \n \t tau0_tolerance =', tau0_tolerance, ' \n \t No. of workflows', nworkflow, '\n \t TB splits per injection', nsplits, '\n \t PSD', psd_file, '\n')
#Checking for unifinshed jobs
unfinished_jobs = check_for_unfinished_jobs(first, last, nsplits)
injection_indices = [x.inj for x in unfinished_jobs]
# Read and transfer TB only once
start = time.time()
tb = func.read_tb(template_bank)
tb_tau0 = np.array([x.tau0 for x in tb])
end = time.time()
print('Time taken to read the template bank', end-start)
main_dir = os.getcwd()
for wf_ind in range(nworkflow):
#Create workflow
wfname = "gw_%s" %wf_ind
workflow = wf.Workflow(args, wfname)
existing_run = False
#Create working(out) directory and start working in that dir
working_dir = "%s/part_%s"%(main_dir, wf_ind)
if (os.path.isdir(working_dir)==True):
print("======== Existing run detected =======")
wf.makedir(working_dir)
wf.makedir("%s/indices"%working_dir)
os.chdir(working_dir)
workflow.out_dir = working_dir
print("Changing directory to", working_dir, 'Out_dir', workflow.out_dir)
#Executable -- compute_FF
FF_exe = wf.Executable(workflow.cp, "FF", ifos=workflow.ifos, out_dir="../results/")
tbhand = to_file(template_bank)
psdhand = to_file(psd_file)
#Injections for the current workflow
jobs_to_exe = np.array_split(unfinished_jobs, nworkflow)[wf_ind]
inj_to_exe = [x.inj for x in jobs_to_exe]
np.savetxt('injs_executing.txt', inj_to_exe)
#Compute template indices around all the injections
#if (os.path.isfile('template_indices.hdf')==True): ##Hard-coded filename
# print('Template indices already computed')
# indices_filename = os.getcwd() + "/indices/template_indices.hdf"
# all_indices = load_template_indices(indices_filename)
#else :
all_indices, indices_filename = find_template_indices(tb_tau0, tau0_tolerance, inj_file)
for job in jobs_to_exe:
#Get template indices
num_inj = job.inj
indices = all_indices[num_inj]
#File handles
injhand = to_file(inj_file)
indiceshand = to_file(indices_filename)
print('Analyzing injections', inj_file, '\n', len(indices), 'templates around injection no.', num_inj)
#TBsplits for the current injection
split_array = job.split_array #num_inj should be index
for split_ind in split_array:
indices = all_indices[num_inj]
template_indices = np.array_split(indices, nsplits)[split_ind]
node = FF_exe.create_node()
node.add_input_opt('--template_bank', tbhand)
node.add_input_opt('--injections', injhand)
node.add_input_opt('--psd_file', psdhand)
node.add_input_opt('--indices_file', indiceshand)
node.add_opt('--sg_ind', num_inj)
node.add_opt('--nsplits', nsplits)
node.add_opt('--split_ind', split_ind)
filetag = "%s_%s" %(num_inj, split_ind)
FF_file = node.new_output_file_opt(workflow.analysis_time, ".hdf", "--output_file", tags=[filetag])
workflow += node
#write dax
daxfile = "gw_%s.dax" %wf_ind
workflow.save(daxfile)