2
2
Base class for instrument objects.
3
3
"""
4
4
import astropy .units as u
5
+ import astropy .wcs
5
6
import copy
6
7
import numpy as np
7
8
import pathlib
10
11
11
12
from astropy .coordinates import SkyCoord
12
13
from dataclasses import dataclass
14
+ from functools import cached_property
13
15
from scipy .interpolate import interp1d
14
16
from scipy .ndimage import gaussian_filter
15
17
from sunpy .coordinates import HeliographicStonyhurst , Helioprojective
@@ -45,14 +47,20 @@ class InstrumentBase:
45
47
Coordinate of the observing instrument
46
48
resolution : `~astropy.units.Quantity`
47
49
cadence : `~astropy.units.Quantity`, optional
48
- If specified, this is used to construct the observing time
50
+ If specified, this is used to construct the observing time.
49
51
pad_fov : `~astropy.units.Quantity`, optional
50
52
Two-dimensional array specifying the padding to apply to the field of view of the synthetic
51
- image in both directions. If None, no padding is applied and the field of view is defined
52
- by the maximal extent of the loop coordinates in each direction.
53
+ image in both directions in pixel space. If None, no padding is applied and the field of
54
+ view is defined by the maximal extent of the loop coordinates in each direction.
55
+ Note that if ``fov_center`` and ``fov_width`` are specified, this is ignored.
53
56
fov_center : `~astropy.coordinates.SkyCoord`, optional
57
+ Reference coordinate coinciding with the center of the field of view.
58
+ For this to have an effect, must also specify ``fov_width``.
54
59
fov_width : `~astropy.units.Quantity`, optional
60
+ Angular extent of the field of the view.
61
+ For this to have an effect, must also specify ``fov_center``.
55
62
average_over_los : `bool`, optional
63
+ Set to true for non-volumetric quantities
56
64
"""
57
65
58
66
@u .quantity_input
@@ -61,8 +69,8 @@ def __init__(self,
61
69
observer ,
62
70
resolution : u .Unit ('arcsec/pix' ),
63
71
cadence : u .s = None ,
64
- pad_fov : u .arcsec = None ,
65
- fov_center = None ,
72
+ pad_fov : u .pixel = None ,
73
+ fov_center = None ,
66
74
fov_width : u .arcsec = None ,
67
75
average_over_los = False ):
68
76
self .observer = observer
@@ -104,20 +112,20 @@ def resolution(self, value):
104
112
105
113
@property
106
114
def observer (self ):
107
- return self ._observer . transform_to ( HeliographicStonyhurst )
115
+ return self ._observer
108
116
109
117
@observer .setter
110
118
def observer (self , value ):
111
- self ._observer = value
119
+ self ._observer = value . transform_to ( HeliographicStonyhurst )
112
120
113
121
@property
114
- def pad_fov (self ) -> u .arcsec :
122
+ def pad_fov (self ) -> u .pixel :
115
123
return self ._pad_fov
116
124
117
125
@pad_fov .setter
118
126
def pad_fov (self , value ):
119
127
if value is None :
120
- value = [0 , 0 ] * u .arcsec
128
+ value = [0 , 0 ] * u .pixel
121
129
self ._pad_fov = value
122
130
123
131
@property
@@ -144,21 +152,17 @@ def get_instrument_name(self, channel):
144
152
return self .name
145
153
146
154
def calculate_intensity_kernel (self , * args , ** kwargs ):
147
- """
148
- Converts emissivity for a particular transition to counts per detector channel. When writing
149
- a new instrument class, this method should be overridden.
150
- """
151
- raise NotImplementedError ('No detect method implemented.' )
155
+ raise NotImplementedError
152
156
153
157
@property
154
158
def projected_frame (self ):
155
- return Helioprojective (observer = self .observer , obstime = self . observer . obstime )
159
+ return Helioprojective (observer = self .observer )
156
160
157
- @property
161
+ @cached_property
158
162
@u .quantity_input
159
163
def pixel_area (self ) -> u .cm ** 2 :
160
164
"""
161
- Pixel area
165
+ Cartesian area on the surface of the Sun covered by a single pixel.
162
166
"""
163
167
sa_equiv = solar_angle_equivalency (self .observer )
164
168
res = (1 * u .pix * self .resolution ).to ('cm' , equivalencies = sa_equiv )
@@ -184,18 +188,18 @@ def observe(self, skeleton, save_directory=None, channels=None, **kwargs):
184
188
skeleton : `~synthesizAR.Skeleton`
185
189
save_directory : `str` or path-like
186
190
"""
187
- check_visible = kwargs .pop ('check_visible' , False )
188
191
if channels is None :
189
192
channels = self .channels
190
193
try :
191
194
import distributed
192
195
client = distributed .get_client ()
193
196
except (ImportError , ValueError ):
194
197
client = None
195
- coordinates = skeleton .all_coordinates
198
+ ref_coord , n_pixels = self .get_fov (skeleton .all_coordinates )
199
+ wcs = astropy .wcs .WCS (header = self .get_header (ref_coord , n_pixels ))
196
200
coordinates_centers = skeleton .all_coordinates_centers
197
- bins , bin_range = self . get_detector_array ( coordinates )
198
- coordinates_centers_projected = coordinates_centers .transform_to (self .projected_frame )
201
+ pixel_locations = wcs . world_to_pixel ( coordinates_centers )
202
+ visibilities = coordinates_centers .transform_to (self .projected_frame ). is_visible ( )
199
203
maps = {}
200
204
for channel in channels :
201
205
# Compute intensity as a function of time and field-aligned coordinate
@@ -244,20 +248,19 @@ def observe(self, skeleton, save_directory=None, channels=None, **kwargs):
244
248
kernels_interp = client .gather (kernel_interp_futures )
245
249
kernels = np .concatenate ([u .Quantity (* k ) for k in kernels_interp ], axis = 1 )
246
250
247
- header = self .get_header (channel , coordinates )
251
+ header = self .get_header (ref_coord , n_pixels , channel = channel )
248
252
# Build a map for each timestep
249
253
maps [channel .name ] = []
250
254
for i , time in enumerate (self .observing_time ):
251
255
m = self .integrate_los (
252
256
time ,
253
257
channel ,
254
258
skeleton ,
255
- coordinates_centers_projected ,
256
- bins ,
257
- bin_range ,
259
+ pixel_locations ,
260
+ n_pixels ,
261
+ visibilities ,
258
262
header ,
259
- kernels = kernels [i ],
260
- check_visible = check_visible )
263
+ kernels = kernels [i ])
261
264
m = self .convolve_with_psf (m , channel )
262
265
if save_directory is None :
263
266
maps [channel .name ].append (m )
@@ -363,8 +366,15 @@ def interpolate_to_instrument_time(kernel, loop, observing_time, axis=0):
363
366
kernel_interp = u .Quantity (f_t (observing_time .value ), kernel_unit )
364
367
return kernel_interp
365
368
366
- def integrate_los (self , time , channel , skeleton , coordinates_centers , bins , bin_range , header ,
367
- kernels = None , check_visible = False ):
369
+ def integrate_los (self ,
370
+ time ,
371
+ channel ,
372
+ skeleton ,
373
+ pixel_locations ,
374
+ n_pixels ,
375
+ visibilities ,
376
+ header ,
377
+ kernels = None ):
368
378
# Compute weights
369
379
if kernels is None :
370
380
i_time = np .where (time == self .observing_time )[0 ][0 ]
@@ -376,27 +386,24 @@ def integrate_los(self, time, channel, skeleton, coordinates_centers, bins, bin_
376
386
# average along the LOS
377
387
if not self .average_over_los :
378
388
kernels *= (skeleton .all_cross_sectional_areas / self .pixel_area ).decompose () * skeleton .all_widths
379
- if check_visible :
380
- visible = coordinates_centers .is_visible ()
381
- else :
382
- visible = np .ones (kernels .shape )
383
389
# Bin
384
- blc , trc = bin_range
390
+ # NOTE: Bin order is (y,x) or (row, column)
391
+ bins = n_pixels .to_value ('pixel' ).astype (int )
392
+ bin_edges = (np .linspace (- 0.5 , bins [1 ]- 0.5 , bins [1 ]+ 1 ),
393
+ np .linspace (- 0.5 , bins [0 ]- 0.5 , bins [0 ]+ 1 ))
385
394
hist , _ , _ = np .histogram2d (
386
- coordinates_centers .Tx .value ,
387
- coordinates_centers .Ty .value ,
388
- bins = bins ,
389
- range = ((blc .Tx .value , trc .Tx .value ), (blc .Ty .value , trc .Ty .value )),
390
- weights = kernels .to_value (self ._expected_unit ) * visible ,
395
+ pixel_locations [1 ],
396
+ pixel_locations [0 ],
397
+ bins = bin_edges ,
398
+ weights = kernels .to_value (self ._expected_unit ) * visibilities ,
391
399
)
392
400
# For some quantities, need to average over all components along a given LOS
393
401
if self .average_over_los :
394
402
_hist , _ , _ = np .histogram2d (
395
- coordinates_centers .Tx .value ,
396
- coordinates_centers .Ty .value ,
397
- bins = bins ,
398
- range = ((blc .Tx .value , trc .Tx .value ), (blc .Ty .value , trc .Ty .value )),
399
- weights = visible ,
403
+ pixel_locations [1 ],
404
+ pixel_locations [0 ],
405
+ bins = bin_edges ,
406
+ weights = visibilities ,
400
407
)
401
408
hist /= np .where (_hist == 0 , 1 , _hist )
402
409
# NOTE: Purposefully using a nonstandard key to record this time as we do not
@@ -407,60 +414,61 @@ def integrate_los(self, time, channel, skeleton, coordinates_centers, bins, bin_
407
414
new_header = copy .deepcopy (header )
408
415
new_header ['date_sim' ] = (self .observer .obstime + time ).isot
409
416
410
- return Map (hist . T , new_header )
417
+ return Map (hist , new_header )
411
418
412
- def get_header (self , channel , coordinates ):
419
+ def get_header (self , ref_coord , n_pixels : u . pixel , channel = None ):
413
420
"""
414
- Create the FITS header for a given channel and set of loop coordinates
415
- that define the needed FOV.
421
+ Create the FITS header for a given channel.
422
+
423
+ Parameters
424
+ ----------
425
+ ref_coord: `~astropy.coordinates.SkyCoord`
426
+ Reference coordinate coincident with the center of the field
427
+ of view
428
+ n_pixels: `~astropy.units.Quantity`
429
+ Pixel extent in the x (horizontal) and y (vertical) direction
430
+ channel: `ChannelBase`, optional
416
431
"""
417
- bins , bin_range = self .get_detector_array (coordinates )
418
- center = SkyCoord (Tx = (bin_range [1 ].Tx + bin_range [0 ].Tx )/ 2 ,
419
- Ty = (bin_range [1 ].Ty + bin_range [0 ].Ty )/ 2 ,
420
- frame = bin_range [0 ].frame )
421
- # FIXME: reference_pixel should be center of the frame in the pixel
422
- # coordinate system of the image.
432
+ # NOTE: channel is a kwarg so that a WCS can be computed without specifying
433
+ # a channel as these keywords do not affect the WCS
434
+ if channel is None :
435
+ instrument = None
436
+ wavelength = None
437
+ else :
438
+ instrument = self .get_instrument_name (channel )
439
+ wavelength = channel .channel
423
440
header = make_fitswcs_header (
424
- ( bins [ 1 ], bins [ 0 ] ), # swap order because it expects (row,column)
425
- center ,
426
- reference_pixel = (u . Quantity ( bins , 'pix' ) - 1 * u .pix ) / 2 , # center of lower left pixel is (0,0)
441
+ tuple ( n_pixels [:: - 1 ]. to_value ( 'pixel' ) ), # swap order because it expects (row,column)
442
+ ref_coord ,
443
+ reference_pixel = (n_pixels - 1 * u .pix ) / 2 , # center of lower left pixel is (0,0)
427
444
scale = self .resolution ,
428
445
observatory = self .observatory ,
429
- instrument = self . get_instrument_name ( channel ) ,
446
+ instrument = instrument ,
430
447
telescope = self .telescope ,
431
448
detector = self .detector ,
432
- wavelength = channel . channel ,
449
+ wavelength = wavelength ,
433
450
unit = self ._expected_unit ,
434
451
)
435
452
return header
436
453
437
- def get_detector_array (self , coordinates ):
454
+ def get_fov (self , coordinates ):
438
455
"""
439
- Calculate the number of pixels in the detector FOV and the physical coordinates of the
440
- bottom left and top right corners.
456
+ Return the coordinate at the center of the FOV and the width in pixels.
441
457
"""
442
458
if self .fov_center is not None and self .fov_width is not None :
443
459
center = self .fov_center .transform_to (self .projected_frame )
444
- bins_x = int (np .ceil ((self .fov_width [0 ] / self .resolution [0 ]).decompose ()).value )
445
- bins_y = int (np .ceil ((self .fov_width [1 ] / self .resolution [1 ]).decompose ()).value )
446
- bottom_left_corner = SkyCoord (
447
- Tx = center .Tx - self .fov_width [0 ]/ 2 ,
448
- Ty = center .Ty - self .fov_width [1 ]/ 2 ,
449
- frame = center .frame ,
450
- )
451
- top_right_corner = SkyCoord (
452
- Tx = bottom_left_corner .Tx + self .fov_width [0 ],
453
- Ty = bottom_left_corner .Ty + self .fov_width [1 ],
454
- frame = bottom_left_corner .frame
455
- )
460
+ n_pixels = (self .fov_width / self .resolution ).decompose ().to ('pixel' )
456
461
else :
457
462
# If not specified, derive FOV from loop coordinates
458
463
coordinates = coordinates .transform_to (self .projected_frame )
459
- bottom_left_corner , top_right_corner = find_minimum_fov (
460
- coordinates , padding = self .pad_fov ,
461
- )
464
+ bottom_left_corner , top_right_corner = find_minimum_fov (coordinates )
462
465
delta_x = top_right_corner .Tx - bottom_left_corner .Tx
463
466
delta_y = top_right_corner .Ty - bottom_left_corner .Ty
464
- bins_x = int (np .ceil ((delta_x / self .resolution [0 ]).decompose ()).value )
465
- bins_y = int (np .ceil ((delta_y / self .resolution [1 ]).decompose ()).value )
466
- return (bins_x , bins_y ), (bottom_left_corner , top_right_corner )
467
+ center = SkyCoord (Tx = bottom_left_corner .Tx + delta_x / 2 ,
468
+ Ty = bottom_left_corner .Ty + delta_y / 2 ,
469
+ frame = bottom_left_corner .frame )
470
+ pixels_x = int (np .ceil ((delta_x / self .resolution [0 ]).decompose ()).value )
471
+ pixels_y = int (np .ceil ((delta_y / self .resolution [1 ]).decompose ()).value )
472
+ n_pixels = u .Quantity ([pixels_x , pixels_y ], 'pixel' )
473
+ n_pixels += self .pad_fov
474
+ return center , n_pixels
0 commit comments