-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathnway-apitest.py
194 lines (173 loc) · 6.79 KB
/
nway-apitest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, division
__doc__ = """Multiway association between astrometric catalogue. Use --help for usage.
Example: nway.py --radius 10 --prior-completeness 0.95 --mag GOODS:mag_H auto --mag IRAC:mag_irac1 auto cdfs4Ms_srclist_v3.fits :Pos_error CANDELS_irac1.fits 0.5 gs_short.fits 0.1 --out=out.fits
"""
import sys
import numpy
from numpy import log10, pi, exp, logical_and
import astropy.io.fits as pyfits
import argparse
import nwaylib.logger as logger
import nwaylib.fastskymatch as match
import nwaylib.bayesdistance as bayesdist
import nwaylib.magnitudeweights as magnitudeweights
import nwaylib
# set up program arguments
def table_from_fits(fitsname, poserr_value=None, area=None, magnitude_columns=[]):
fits_table = pyfits.open(fitsname)[1]
table_name = fits_table.name
ra = fits_table.data['RA']
dec = fits_table.data['DEC']
if 'pos_err' in fits_table.data.columns.names:
poserr = fits_table.data['pos_err']
else:
assert poserr_value is not None, ('"pos_err" column not found in file "%s", and no poserr_value passed' % fitsname)
poserr = poserr_value * numpy.ones(len(ra))
if area is None:
area = fits_table.header['SKYAREA'] * 1.0
# magnitude columns
mags = []
maghists = []
magnames = []
#for mag in magnitude_columns:
for col_name, magfile in magnitude_columns:
assert col_name in fits_table.data.dtype.names
mag_all = fits_table.data[col_name]
# mark -99 as undefined
mag_all[mag_all == -99] = numpy.nan
mags.append(mag_all)
magnames.append(col_name)
if magfile == 'auto':
maghists.append(None)
else:
bins_lo, bins_hi, hist_sel, hist_all = numpy.loadtxt(magfile).transpose()
maghists.append((bins_lo, bins_hi, hist_sel, hist_all))
return dict(name=table_name, ra=ra, dec=dec, error=poserr, area=area, mags=mags, maghists=maghists, magnames=magnames)
# area in square degrees
# error in arcsec
# ra/dec in degrees
# mag: column of something
# maghists: either (bin, sel, all) tuple or None (for auto)
result = nwaylib.nway_match(
[
table_from_fits('doc/COSMOS_XMM.fits', area=2.0),
table_from_fits('doc/COSMOS_OPTICAL.fits', poserr_value=0.1, area=2.0),
],
match_radius = 20, # in arcsec
prior_completeness = 0.9,
)
assert len(result) == 37836, (len(result), result)
min_output_columns = ['Separation_max', 'ncat', 'dist_bayesfactor', 'dist_post', 'p_single', 'prob_has_match', 'prob_this_match']
extra_columns = ['Separation_XMM_OPT']
for col in min_output_columns + extra_columns:
assert col in result.columns, ('looking for', col, 'in', result.columns)
result = nwaylib.nway_match(
[
table_from_fits('doc/COSMOS_XMM.fits', area=2.0),
table_from_fits('doc/COSMOS_OPTICAL.fits', poserr_value=0.1, area=2.0, magnitude_columns=[('MAG', 'auto')]),
],
match_radius = 20, # in arcsec
prior_completeness = 0.9,
store_mag_hists = False,
mag_include_radius = 4.0, # in arcsec
)
assert len(result) == 37836, (len(result), result)
extra_columns = ['Separation_XMM_OPT', 'bias_OPT_MAG']
for col in min_output_columns + extra_columns:
assert col in result.columns, ('looking for', col, 'in', result.columns)
result = nwaylib.nway_match(
[
table_from_fits('doc/COSMOS_XMM.fits', area=2.0),
table_from_fits('doc/COSMOS_OPTICAL.fits', poserr_value=0.1, area=2.0, magnitude_columns=[('MAG', 'auto')]),
],
match_radius = 20, # in arcsec
prior_completeness = 0.9,
mag_include_radius = 4.0, # in arcsec
)
assert len(result) == 37836, (len(result), result)
for col in min_output_columns + extra_columns:
assert col in result.columns, ('looking for', col, 'in', result.columns)
result = nwaylib.nway_match(
[
table_from_fits('doc/COSMOS_XMM.fits', area=2.0),
table_from_fits('doc/COSMOS_OPTICAL.fits', poserr_value=0.1, area=2.0, magnitude_columns=[('MAG', 'auto')]),
table_from_fits('doc/COSMOS_IRAC.fits', poserr_value=0.5, area=2.0, magnitude_columns=[('mag_ch1', 'auto')]),
],
match_radius = 20,
prior_completeness = 0.9,
)
assert len(result) == 387601, (len(result), result)
extra_columns = ['Separation_XMM_OPT', 'Separation_OPT_IRAC', 'Separation_XMM_IRAC', 'bias_OPT_MAG', 'bias_IRAC_mag_ch1']
for col in min_output_columns + extra_columns:
assert col in result.columns, ('looking for', col, 'in', result.columns)
result = nwaylib.nway_match(
[
table_from_fits('doc/COSMOS_XMM.fits', area=2.0),
table_from_fits('doc/COSMOS_OPTICAL.fits', poserr_value=0.1, area=2.0, magnitude_columns=[('MAG', 'OPT_MAG_fit.txt')]),
table_from_fits('doc/COSMOS_IRAC.fits', poserr_value=0.5, area=2.0, magnitude_columns=[('mag_ch1', 'IRAC_mag_ch1_fit.txt')]),
],
match_radius = 20,
prior_completeness = 0.9,
logger=logger.NullOutputLogger(),
)
assert len(result) == 387601, (len(result), result)
for col in min_output_columns + extra_columns:
assert col in result.columns, ('looking for', col, 'in', result.columns)
"""
if not filenames[0].endswith('shifted.fits'):
print()
print()
print(' You can calibrate a p_any cut-off with the following steps:')
print(' 1) Create a offset catalogue to simulate random sky positions:')
shiftfile = filenames[0].replace('.fits', '').replace('.FITS', '') + '-fake.fits'
shiftoutfile = outfile + '-fake.fits'
print(' nway-create-fake-catalogue.py --radius %d %s %s' % (args.radius*2, filenames[0], shiftfile))
print(' 2) Match the offset catalogue in the same way as this run:')
newargv = []
i = 0
while i < len(sys.argv):
v = sys.argv[i]
if v == filenames[0]:
newargv.append(shiftfile)
elif v == '--mag':
newargv.append(v)
v = sys.argv[i+1]
newargv.append(v)
if sys.argv[i+2] == 'auto':
newargv.append(v.replace(':', '_') + '_fit.txt')
else:
newargv.append(sys.argv[i+2])
i = i + 2
elif v == '--out':
newargv.append(v)
i = i + 1
newargv.append(shiftoutfile)
elif v.startswith('--out='):
newargv.append('--out=' + shiftoutfile)
else:
newargv.append(v)
i = i + 1
print(' ' + ' '.join(newargv))
print(' 3) determining the p_any cutoff that corresponds to a false-detection rate')
print(' nway-calibrate-cutoff.py %s %s' % (outfile, shiftoutfile))
print()
# write out fits file
print()
print('creating output FITS file ...')
tbhdu = match.fits_from_columns(pyfits.ColDefs(columns))
hdulist = match.wraptable2fits(tbhdu, 'MULTIMATCH')
hdulist[0].header['METHOD'] = 'multi-way matching'
hdulist[0].header['INPUT'] = ', '.join(filenames)
hdulist[0].header['TABLES'] = ', '.join(table_names)
hdulist[0].header['BIASING'] = ', '.join(biases.keys())
hdulist[0].header['NWAYCMD'] = ' '.join(sys.argv)
for k, v in args.__dict__.items():
hdulist[0].header.add_comment("argument %s: %s" % (k, v))
hdulist[0].header.update(match_header)
print(' writing "%s" (%d rows, %d columns) ...' % (outfile, len(tbhdu.data), len(columns)))
hdulist.writeto(outfile, **progress.kwargs_overwrite_true)
import nwaylib.checkupdates
nwaylib.checkupdates.checkupdates()
"""