# -*- coding: utf-8 -*-
"""
pyDIS: A simple one dimensional spectra reduction and analysis package
Created with the Apache Point Observatory (APO) 3.5-m telescope's
Dual Imaging Spectrograph (DIS) in mind. YMMV
e.g. DIS specifics:
- have BLUE/RED channels
- hand-code in that the RED channel wavelength is backwards
- dispersion along the X, spatial along the Y axis
"""
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.widgets import Cursor
import os
import numpy as np
from astropy.io import fits
from astropy.convolution import convolve, Box1DKernel
from scipy.optimize import curve_fit
import scipy.signal
from scipy.interpolate import UnivariateSpline
from scipy.interpolate import SmoothBivariateSpline
import warnings
# import datetime
# from matplotlib.widgets import SpanSelector
__all__ = ['OpenImg', 'ap_trace', 'ap_extract', 'HeNeAr_fit', 'mapwavelength',
'biascombine', 'flatcombine', 'line_trace', 'find_peaks',
'lines_to_surface', 'normalize', 'DefFluxCal', 'ApplyFluxCal',
'AirmassCor']
def _mag2flux(wave, mag, zeropt=48.60):
'''
Convert magnitudes to flux units. This is important for dealing with standards
and files from IRAF, which are stored in AB mag units. To be clear, this converts
to "PHOTFLAM" units in IRAF-speak. Assumes the common flux zeropoint used in IRAF
Parameters
----------
wave : 1d numpy array
The wavelength of the data points
mag : 1d numpy array
The magnitudes of the data
zeropt : float, optional
Conversion factor for mag->flux. (Default is 48.60)
Returns
-------
Flux values!
'''
c = 2.99792458e18 # speed of light, in A/s
flux = 10.0**( (mag + zeropt) / (-2.5) )
return flux * (c / wave**2.0)
def _gaus(x, a, b, x0, sigma):
"""
Simple Gaussian function, for internal use only
Parameters
----------
x : float or 1-d numpy array
The data to evaluate the Gaussian over
a : float
the amplitude
b : float
the constant offset
x0 : float
the center of the Gaussian
sigma : float
the width of the Gaussian
Returns
-------
Array or float of same type as input (x).
"""
return a * np.exp(-(x - x0)**2 / (2 * sigma**2)) + b
def _WriteSpec(spec, wfinal, ffinal, efinal, trace):
# write file with the trace (y positions)
tout = open(spec+'.trace','w')
tout.write('# This file contains the x,y coordinates of the trace \n')
for k in range(len(trace)):
tout.write(str(k)+', '+str(trace[k]) + '\n')
tout.close()
# write the final spectrum out
fout = open(spec+'.spec','w')
fout.write('# This file contains the final extracted (wavelength,flux,err) data \n')
for k in range(len(wfinal)):
fout.write(str(wfinal[k]) + ' ' + str(ffinal[k]) + ' ' + str(efinal[k]) + '\n')
fout.close()
return
def _CheckMono(wave):
'''
Check if the wavelength array is monotonically increasing. Return a
warning if not. NOTE: because RED/BLUE wavelength direction is flipped
it has to check both increasing and decreasing. It must satisfy one!
Method adopted from here:
http://stackoverflow.com/a/4983359/4842871
'''
# increasing
up = all(x<y for x, y in zip(wave, wave[1:]))
# decreasing
dn = all(x>y for x, y in zip(wave, wave[1:]))
if (up is False) and (dn is False):
print("WARNING: Wavelength array is not monotonically increasing!")
return
[docs]class OpenImg(object):
"""
A simple wrapper for astropy.io.fits (pyfits) to open and extract
the data we want from images and headers.
Parameters
----------
file : string
The path to the image to open
trim : bool, optional
Trim the image using the DATASEC keyword in the header, assuming
has format of [0:1024,0:512] (Default is True)
Returns
-------
image object
"""
def __init__(self, file, trim=True):
self.file = file
self.trim = trim
hdu = fits.open(file)
if trim is True:
self.datasec = hdu[0].header['DATASEC'][1:-1].replace(':',',').split(',')
d = list(map(int, self.datasec))
self.data = hdu[0].data[d[2]-1:d[3],d[0]-1:d[1]]
else:
self.data = hdu[0].data
if 'AIRMASS' in hdu[0].header:
self.airmass = hdu[0].header['AIRMASS']
elif 'ZD' in hdu[0].header:
# try using the Zenith Distance (assume in degrees)
ZD = hdu[0].header['ZD'] / 180.0 * np.pi
self.airmass = 1.0/np.cos(ZD) # approximate airmass
else:
self.airmass = 1.0
# compute the approximate wavelength solution
try:
self.disp_approx = hdu[0].header['DISPDW']
self.wcen_approx = hdu[0].header['DISPWC']
# the red chip wavelength is backwards (DIS specific)
clr = hdu[0].header['DETECTOR']
if (clr.lower()=='red'):
sign = -1.0
else:
sign = 1.0
self.wavelength = (np.arange(self.data.shape[1]) -
(self.data.shape[1])/2.0) * \
self.disp_approx * sign + self.wcen_approx
except KeyError:
# if these keywords aren't in the header, just return pixel #
self.wavelength = np.arange(self.data.shape[1])
self.exptime = hdu[0].header['EXPTIME']
hdu.close(closed=True)
# return raw, exptime, airmass, wapprox
[docs]def biascombine(biaslist, output='BIAS.fits', trim=True, silent=True):
"""
Combine the bias frames in to a master bias image. Currently only
supports median combine.
Parameters
----------
biaslist : str
Path to file containing list of bias images.
output: str, optional
Name of the master bias image to write. (Default is "BIAS.fits")
trim : bool, optional
Trim the image using the DATASEC keyword in the header, assuming
has format of [0:1024,0:512] (Default is True)
silent : bool, optional
If False, print details about the biascombine. (Default is True)
Returns
-------
bias : 2-d array
The median combined master bias image
"""
# assume biaslist is a simple text file with image names
# e.g. ls flat.00*b.fits > bflat.lis
files = np.genfromtxt(biaslist,dtype=np.str)
if silent is False:
print('biascombine: combining ' + str(len(files)) + ' files in ' + biaslist)
for i in range(0,len(files)):
hdu_i = fits.open(files[i])
if trim is False:
im_i = hdu_i[0].data
if trim is True:
datasec = hdu_i[0].header['DATASEC'][1:-1].replace(':',',').split(',')
d = list(map(int, datasec))
im_i = hdu_i[0].data[d[2]-1:d[3],d[0]-1:d[1]]
# create image stack
if (i==0):
all_data = im_i
elif (i>0):
all_data = np.dstack( (all_data, im_i) )
hdu_i.close(closed=True)
# do median across whole stack
bias = np.nanmedian(all_data, axis=2)
# write output to disk for later use
hduOut = fits.PrimaryHDU(bias)
hduOut.writeto(output, overwrite=True)
return bias
def overscanbias(img, cols=(1,), rows=(1,)):
'''
Generate a bias frame based on overscan region.
Can work with rows or columns, pass either kwarg the limits:
>>> bias = overscanbias(imagedata, cols=(1024,1050)) # doctest: +SKIP
'''
bias = np.zeros_like(img)
if len(cols) > 1:
bcol = np.nanmean(img[:, cols[0]:cols[1]], axis=0)
for j in range(img.shape()[1]):
img[j,:] = bcol
elif len(rows) > 1:
brow = np.nanmean(img[rows[0]:rows[1], :], axis=1)
for j in range(img.shape()[0]):
img[j,:] = brow
else:
print('OVERSCANBIAS ERROR: need to pass either cols=(a,b) or rows=(a,b),')
print('setting bias = zero as result!')
return bias
[docs]def flatcombine(flatlist, bias, output='FLAT.fits', trim=True, mode='spline',
display=True, flat_poly=5, response=True, Saxis=1):
"""
Combine the flat frames in to a master flat image. Subtracts the
master bias image first from each flat image. Currently only
supports median combining the images.
Parameters
----------
flatlist : str
Path to file containing list of flat images.
bias : str or 2-d array
Either the path to the master bias image (str) or
the output from 2-d array output from biascombine
output : str, optional
Name of the master flat image to write. (Default is "FLAT.fits")
response : bool, optional
If set to True, first combines the median image stack along the
spatial (Y) direction, then fits polynomial to 1D curve, then
divides each row in flat by this structure. This nominally divides
out the spectrum of the flat field lamp. (Default is True)
trim : bool, optional
Trim the image using the DATASEC keyword in the header, assuming
has format of [0:1024,0:512] (Default is True)
display : bool, optional
Set to True to show 1d flat, and final flat (Default is False)
flat_poly : int, optional
Polynomial order to fit 1d flat curve with. Only used if
response is set to True. (Default is 5)
Saxis : int, optional
Set which axis the spatial dimension is along. 1 = Y axis, 0 = X.
(Default is 1)
Returns
-------
flat : 2-d array
The median combined master flat
"""
# read the bias in, BUT we don't know if it's the numpy array or file name
if isinstance(bias, str):
# read in file if a string
bias_im = fits.open(bias)[0].data
else:
# assume is proper array from biascombine function
bias_im = bias
# assume flatlist is a simple text file with image names
# e.g. ls flat.00*b.fits > bflat.lis
files = np.genfromtxt(flatlist,dtype=np.str)
for i in range(0,len(files)):
hdu_i = fits.open(files[i])
if trim is False:
im_i = hdu_i[0].data - bias_im
if trim is True:
datasec = hdu_i[0].header['DATASEC'][1:-1].replace(':',',').split(',')
d = list(map(int, datasec))
im_i = hdu_i[0].data[d[2]-1:d[3],d[0]-1:d[1]] - bias_im
# check for bad regions (not illuminated) in the spatial direction
ycomp = im_i.sum(axis=Saxis) # compress to spatial axis only
illum_thresh = 0.8 # value compressed data must reach to be used for flat normalization
ok = np.where( (ycomp>= np.nanmedian(ycomp)*illum_thresh) )
# assume a median scaling for each flat to account for possible different exposure times
if (i==0):
all_data = im_i / np.nanmedian(im_i[ok,:])
elif (i>0):
all_data = np.dstack( (all_data, im_i / np.nanmedian(im_i[ok,:])) )
hdu_i.close(closed=True)
# do median across whole stack of flat images
flat_stack = np.nanmedian(all_data, axis=2)
# define the wavelength axis
Waxis = 0
# add a switch in case the spatial/wavelength axis is swapped
if Saxis is 0:
Waxis = 1
if response is True:
xdata = np.arange(all_data.shape[1]) # x pixels
# sum along spatial axis, smooth w/ 5pixel boxcar, take log of summed flux
flat_1d = np.log10(convolve(flat_stack[ok,:].mean(axis=Waxis), Box1DKernel(5)))
if mode=='spline':
spl = UnivariateSpline(xdata, flat_1d, ext=0, k=2 ,s=0.001)
flat_curve = 10.0**spl(xdata)
elif mode=='poly':
# fit log flux with polynomial
flat_fit = np.polyfit(xdata, flat_1d, flat_poly)
# get rid of log
flat_curve = 10.0**np.polyval(flat_fit, xdata)
if display is True:
plt.figure()
plt.plot(10.0**flat_1d)
plt.plot(xdata, flat_curve,'r')
plt.show()
# divide median stacked flat by this RESPONSE curve
flat = np.zeros_like(flat_stack)
if Saxis is 1:
for i in range(flat_stack.shape[Waxis]):
flat[i,:] = flat_stack[i,:] / flat_curve
else:
for i in range(flat_stack.shape[Waxis]):
flat[:,i] = flat_stack[:,i] / flat_curve
else:
flat = flat_stack
if display is True:
plt.figure()
plt.imshow(flat, origin='lower',aspect='auto')
plt.show()
# write output to disk for later use
hduOut = fits.PrimaryHDU(flat)
hduOut.writeto(output, overwrite=True)
return flat ,ok[0]
[docs]def ap_trace(img, fmask=(1,), nsteps=20, interac=False,
recenter=False, prevtrace=(0,), bigbox=15,
Saxis=1, display=False):
"""
Trace the spectrum aperture in an image
Assumes wavelength axis is along the X, spatial axis along the Y.
Chops image up in bins along the wavelength direction, fits a Gaussian
within each bin to determine the spatial center of the trace. Finally,
draws a cubic spline through the bins to up-sample the trace.
Parameters
----------
img : 2d numpy array
This is the image, stored as a normal numpy array. Can be read in
using astropy.io.fits like so:
>>> hdu = fits.open('file.fits') # doctest: +SKIP
>>> img = hdu[0].data # doctest: +SKIP
nsteps : int, optional
Keyword, number of bins in X direction to chop image into. Use
fewer bins if ap_trace is having difficulty, such as with faint
targets (default is 50, minimum is 4)
fmask : array-like, optional
A list of illuminated rows in the spatial direction (Y), as
returned by flatcombine.
interac : bool, optional
Set to True to have user click on the y-coord peak. (Default is
False)
recenter : bool, optional
Set to True to use previous trace, but allow small shift in
position. Currently only allows linear shift (Default is False)
bigbox : float, optional
The number of sigma away from the main aperture to allow to trace
display : bool, optional
If set to true display the trace over-plotted on the image
Saxis : int, optional
Set which axis the spatial dimension is along. 1 = Y axis, 0 = X.
(Default is 1)
Returns
-------
my : array
The spatial (Y) positions of the trace, interpolated over the
entire wavelength (X) axis
"""
# define the wavelength axis
Waxis = 0
# add a switch in case the spatial/wavelength axis is swapped
if Saxis is 0:
Waxis = 1
print('Tracing Aperture using nsteps='+str(nsteps))
# the valid y-range of the chip
if (len(fmask)>1):
ydata = np.arange(img.shape[Waxis])[fmask]
else:
ydata = np.arange(img.shape[Waxis])
# need at least 4 samples along the trace. sometimes can get away with very few
if (nsteps<4):
nsteps = 4
# median smooth to crudely remove cosmic rays
img_sm = scipy.signal.medfilt2d(img, kernel_size=(5,5))
#--- Pick the strongest source, good if only 1 obj on slit
ztot = img_sm.sum(axis=Saxis)[ydata]
yi = np.arange(img.shape[Waxis])[ydata]
peak_y = yi[np.nanargmax(ztot)]
peak_guess = [np.nanmax(ztot), np.nanmedian(ztot), peak_y, 2.]
#-- allow interactive mode, if mult obj on slit
if interac is True and recenter is False:
class InteracTrace(object):
def __init__(self):
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111)
self.ax.plot(yi, ztot)
plt.ylabel('Counts (Image summed in X direction)')
plt.xlabel('Y Pixel')
plt.title('Click on object!')
self.cursor = Cursor(self.ax, useblit=False, horizOn=False,
color='red', linewidth=1 )
self.connect = self.fig.canvas.mpl_connect
self.disconnect = self.fig.canvas.mpl_disconnect
self.ClickID = self.connect('button_press_event', self.__onclick__)
return
def __onclick__(self,click):
if self.fig.canvas.manager.toolbar._active is None:
self.xpoint = click.xdata
self.ypoint = click.ydata
self.disconnect(self.ClickID) # disconnect from event
self.cursor.disconnect_events()
self.cursor._update()
plt.close() # close window when clicked
return self.xpoint, self.ypoint
else:
pass
theclick = InteracTrace()
plt.show()
xcl = theclick.xpoint
# ycl = theclick.ypoint
peak_guess[2] = xcl
#-- use middle of previous trace as starting guess
if (recenter is True) and (len(prevtrace)>10):
peak_guess[2] = np.nanmedian(prevtrace)
#-- fit a Gaussian to peak
popt_tot, pcov = curve_fit(_gaus, yi[np.isfinite(ztot)], ztot[np.isfinite(ztot)], p0=peak_guess)
#-- only allow data within a box around this peak
ydata2 = ydata[np.where((ydata>=popt_tot[2] - popt_tot[3]*bigbox) &
(ydata<=popt_tot[2] + popt_tot[3]*bigbox))]
ztot = img_sm.sum(axis=Saxis)[ydata2]
yi = np.arange(img.shape[Waxis])[ydata2]
# define the X-bin edges
xbins = np.linspace(0, img.shape[Saxis], nsteps, dtype='int')
ybins = np.zeros_like(xbins, dtype='int')
for i in range(0,len(xbins)-1):
#-- fit gaussian w/i each window
if Saxis is 1:
zi = np.sum(img_sm[ydata2, xbins[i]:xbins[i+1]], axis=Saxis)
else:
zi = img_sm[xbins[i]:xbins[i+1], ydata2].sum(axis=Saxis)
pguess = [np.nanmax(zi), np.nanmedian(zi), yi[np.nanargmax(zi)], 2.]
popt,pcov = curve_fit(_gaus, yi[np.isfinite(ztot)], zi[np.isfinite(ztot)], p0=pguess)
# if gaussian fits off chip, then use chip-integrated answer
if (popt[2] <= min(ydata2)+25) or (popt[2] >= max(ydata2)-25):
ybins[i] = popt_tot[2]
popt = popt_tot
else:
ybins[i] = popt[2]
# update the box it can search over, in case a big bend in the order
# ydata2 = ydata[np.where((ydata>= popt[2] - popt[3]*bigbox) &
# (ydata<= popt[2] + popt[3]*bigbox))]
# recenter the bin positions, trim the unused bin off in Y
mxbins = (xbins[:-1]+xbins[1:]) / 2.
mybins = ybins[:-1]
# run a cubic spline thru the bins
ap_spl = UnivariateSpline(mxbins, mybins, ext=0, k=3, s=0)
# interpolate the spline to 1 position per column
mx = np.arange(0, img.shape[Saxis])
my = ap_spl(mx)
if display is True:
plt.figure()
plt.imshow(np.log10(img),origin='lower',aspect='auto',cmap=cm.Greys_r)
plt.autoscale(False)
plt.plot(mx,my,'b',lw=1)
# plt.plot(mx,my+popt_tot[3]*bigbox,'y')
# plt.plot(mx,my-popt_tot[3]*bigbox,'y')
plt.show()
print("> Trace gaussian width = "+str(popt_tot[3])+' pixels')
return my
[docs]def line_trace(img, pcent, wcent, fmask=(1,), maxbend=10, display=False):
'''
Trace the lines of constant wavelength along the spatial dimension.
To be run after peaks found in the HeNeAr lamp. Usually run internally
to HeNeAr_fit()
Method works by tracing up and down from the image center (slice) along
each HeNeAr line by 1 pixel, fitting a gaussian to find the center.
Parameters
----------
img : 2d float
the HeNeAr data
pcent : float array
the pixel center along the image slice of each HeNeAr line to trace
wcent : float array
the identified wavelength that corresponds to each peak's pixel center (pcent)
fmask : float array, optional
the illumination section to trace trace over in spatial dimension
maxbend : int, optional
How big of a width (in pixel units) to allow the bend in the HeNeAr
line to search over (Default is 10). Probably doesn't need to be
modified much.
display : bool, optional
should we display plot after? (Default is False)
Returns
-------
xcent, ycent, wcent
These are the arrays of X pixel (wavelength dimension), Y pixel
(spatial dimension), and corresponding wavelengths of each HeNeAr line.
'''
xcent_big = []
ycent_big = []
wcent_big = []
# the valid y-range of the chip
if (len(fmask)>1):
ydata = np.arange(img.shape[0])[fmask]
else:
ydata = np.arange(img.shape[0])
ybuf = 10
# split the chip in to 2 parts, above and below the center
ydata1 = ydata[np.where((ydata>=img.shape[0]/2) &
(ydata<img.shape[0]-ybuf))]
ydata2 = ydata[np.where((ydata<img.shape[0]/2) &
(ydata>ybuf))][::-1]
# plt.figure()
# plt.plot(img[img.shape[0]/2,:])
# plt.scatter(pcent, pcent*0.+np.mean(img))
# plt.show()
img_med = np.nanmedian(img)
# loop over every HeNeAr peak that had a good fit
for i in range(len(pcent)):
xline = np.arange(int(pcent[i])-maxbend,int(pcent[i])+maxbend)
# above center line (where fit was done)
for j in ydata1:
yline = img[j-ybuf:j+ybuf, int(pcent[i])-maxbend:int(pcent[i])+maxbend].sum(axis=0)
# fit gaussian, assume center at 0, width of 2
if j==ydata1[0]:
cguess = pcent[i] # xline[np.argmax(yline)]
pguess = [np.nanmax(yline), img_med, cguess, 2.]
try:
popt,pcov = curve_fit(_gaus, xline, yline, p0=pguess)
if popt[2]>0 and popt[2]<img.shape[1]:
cguess = popt[2] # update center pixel
xcent_big = np.append(xcent_big, popt[2])
ycent_big = np.append(ycent_big, j)
wcent_big = np.append(wcent_big, wcent[i])
except RuntimeError:
popt = pguess
# below center line, from middle down
for j in ydata2:
yline = img[j-ybuf:j+ybuf, int(pcent[i])-maxbend:int(pcent[i])+maxbend].sum(axis=0)
# fit gaussian, assume center at 0, width of 2
if j==ydata2[0]:
cguess = pcent[i] # xline[np.argmax(yline)]
pguess = [np.nanmax(yline), img_med, cguess, 2.]
try:
popt,pcov = curve_fit(_gaus, xline, yline, p0=pguess)
if popt[2]>0 and popt[2]<img.shape[1]:
cguess = popt[2] # update center pixel
xcent_big = np.append(xcent_big, popt[2])
ycent_big = np.append(ycent_big, j)
wcent_big = np.append(wcent_big, wcent[i])
except RuntimeError:
popt = pguess
if display is True:
plt.figure()
plt.imshow(np.log10(img), origin = 'lower',aspect='auto',cmap=cm.Greys_r)
plt.colorbar()
plt.scatter(xcent_big,ycent_big,marker='|',c='r')
plt.show()
return xcent_big, ycent_big, wcent_big
[docs]def find_peaks(wave, flux, pwidth=10, pthreshold=97, minsep=1):
'''
Given a slice thru a HeNeAr image, find the significant peaks.
Parameters
----------
wave : `~numpy.ndarray`
Wavelength
flux : `~numpy.ndarray`
Flux
pwidth : float
the number of pixels around the "peak" to fit over
pthreshold : float
Peak threshold
minsep : float
Minimum separation
Returns
-------
Peak Pixels, Peak Wavelengths
'''
# sort data, cut top x% of flux data as peak threshold
flux_thresh = np.percentile(flux, pthreshold)
# find flux above threshold
high = np.where((flux >= flux_thresh))
# find individual peaks (separated by > 1 pixel)
pk = high[0][1:][ ( (high[0][1:]-high[0][:-1]) > minsep ) ]
# offset from start/end of array by at least same # of pixels
pk = pk[pk > pwidth]
pk = pk[pk < (len(flux) - pwidth)]
# print('Found '+str(len(pk))+' peaks in HeNeAr to fit Gaussians to')
pcent_pix = np.zeros_like(pk,dtype='float')
wcent_pix = np.zeros_like(pk,dtype='float') # wtemp[pk]
# for each peak, fit a gaussian to find center
for i in range(len(pk)):
xi = wave[pk[i] - pwidth:pk[i] + pwidth]
yi = flux[pk[i] - pwidth:pk[i] + pwidth]
pguess = (np.nanmax(yi), np.nanmedian(flux), float(np.nanargmax(yi)), 2.)
try:
popt,pcov = curve_fit(_gaus, np.arange(len(xi),dtype='float'), yi,
p0=pguess)
# the gaussian center of the line in pixel units
pcent_pix[i] = (pk[i]-pwidth) + popt[2]
# and the peak in wavelength units
wcent_pix[i] = xi[np.nanargmax(yi)]
except RuntimeError:
pcent_pix[i] = float('nan')
wcent_pix[i] = float('nan')
wcent_pix, ss = np.unique(wcent_pix, return_index=True)
pcent_pix = pcent_pix[ss]
okcent = np.where((np.isfinite(pcent_pix)))
return pcent_pix[okcent], wcent_pix[okcent]
[docs]def lines_to_surface(img, xcent, ycent, wcent,
mode='spline2d', fit_order=2, display=False):
'''
Turn traced arc lines into a wavelength solution across the entire chip
Requires inputs from line_trace(). Outputs are a 2d wavelength solution
Parameters
----------
img : 2d array
the HeNeAr data
xcent : 1d array
the X (spatial) pixel positions of the HeNeAr lines
ycent : 1d array
the Y (wavelength) pixel positions of the HeNeAr lines
wcent : 1d array
the wavelength values of the HeNeAr lines
mode : str, {'poly', 'spline', 'spline2d'}
what mode of interpolation to use to go from traces along the
HeNeAr lines to a wavelength value for every (x,y) pixel?
Options include (1) poly: along 1-pixel wide slices in the spatial
dimension, fit a polynomial between the HeNeAr lines. Uses fit_order;
(2) spline: along 1-pixel wide slices in the spatial dimension,
fit a quadratic spline; (3) spline2d: fit a full 2d surface using a
cubic spline. This is the best option, in principle.
Returns
-------
the 2d wavelenth solution. Output depends on mode parameter.
'''
xsz = img.shape[1]
# fit the wavelength solution for the entire chip w/ a 2d spline
if (mode=='spline2d'):
xfitd = 5 # the spline dimension in the wavelength space
print('Fitting Spline2d - NOTE: this mode doesnt work well')
wfit = SmoothBivariateSpline(xcent, ycent, wcent, kx=xfitd, ky=3,
bbox=[0,img.shape[1],0,img.shape[0]], s=0)
#elif mode=='poly2d':
## using 2d polyfit
# wfit = polyfit2d(xcent_big, ycent_big, wcent_big, order=3)
elif mode=='spline':
wfit = np.zeros_like(img)
xpix = np.arange(xsz)
for i in np.arange(ycent.min(), ycent.max()):
x = np.where((ycent == i))
x_u, ind_u = np.unique(xcent[x], return_index=True)
# this smoothing parameter is absurd...
spl = UnivariateSpline(x_u, wcent[x][ind_u], ext=0, k=3, s=5e7)
if display is True:
plt.figure()
plt.scatter(xcent[x][ind_u], wcent[x][ind_u])
plt.plot(xpix, spl(xpix))
plt.show()
wfit[int(i),:] = spl(xpix)
elif mode=='poly':
wfit = np.zeros_like(img)
xpix = np.arange(xsz)
for i in np.arange(ycent.min(), ycent.max()):
x = np.where((ycent == i))
coeff = np.polyfit(xcent[x], wcent[x], fit_order)
wfit[int(i),:] = np.polyval(coeff, xpix)
return wfit
[docs]def HeNeAr_fit(calimage, linelist='apohenear.dat', interac=True,
trim=True, fmask=(1,), display=False,
tol=10, fit_order=2, previous='',mode='poly',
second_pass=True):
"""
Determine the wavelength solution to be used for the science images.
Can be done either automatically (buyer beware) or manually. Both the
manual and auto modes use a "slice" through the chip center to learn
the wavelengths of specific HeNeAr lines. Emulates the IDENTIFY
function in IRAF.
If the automatic mode is selected (interac=False), program tries to
first find significant peaks in the "slice", then uses a brute-force
guess scheme based on the grating information in the header. While
easy, your mileage may vary with this method.
If the interactive mode is selected (interac=True), you click on
features in the "slice" and identify their wavelengths.
Parameters
----------
calimage : str
Path to the HeNeAr calibration image
linelist : str, optional
The linelist file to use in the resources/linelists/ directory.
Only used in automatic mode. (Default is apohenear.dat)
interac : bool, optional
Should the HeNeAr identification be done interactively (manually)?
(Default is True)
trim : bool, optional
Trim the image using the DATASEC keyword in the header, assuming
has format of [0:1024,0:512] (Default is True)
fmask : array-like, optional
A list of illuminated rows in the spatial direction (Y), as
returned by flatcombine.
display : bool, optional
tol : int, optional
When in automatic mode, the tolerance in pixel units between
linelist entries and estimated wavelengths for the first few
lines matched... use carefully. (Default is 10)
mode : str, optional
What type of function to use to fit the entire 2D wavelength
solution? Options include (poly, spline2d). (Default is poly)
fit_order : int, optional
The polynomial order to use to interpolate between identified
peaks in the HeNeAr (Default is 2)
previous : string, optional
name of file containing previously identified peaks. Still has to
do the fitting.
Returns
-------
wfit : bivariate spline object or 2d polynomial
The wavelength solution at every pixel. Output type depends on the
mode keyword above (poly is recommended)
"""
print('Running HeNeAr_fit function on file '+calimage)
# silence the polyfit warnings
warnings.simplefilter('ignore', np.RankWarning)
hdu = fits.open(calimage)
if trim is False:
img = hdu[0].data
if trim is True:
datasec = hdu[0].header['DATASEC'][1:-1].replace(':',',').split(',')
d = list(map(int, datasec))
img = hdu[0].data[d[2]-1:d[3],d[0]-1:d[1]]
# this approach will be very DIS specific
try:
disp_approx = hdu[0].header['DISPDW']
wcen_approx = hdu[0].header['DISPWC']
except KeyError:
disp_approx = 0.
wcen_approx = 1.
try:
# the red chip wavelength is backwards (DIS specific)
clr = hdu[0].header['DETECTOR']
if (clr.lower() == 'red'):
sign = -1.0
else:
sign = 1.0
except KeyError:
sign = 1.0
hdu.close(closed=True)
#-- this is how I *want* to do this. Need to header values later though...
# img, _, _, wtemp = OpenImg(calimage, trim=trim)
# take a slice thru the data (+/- 10 pixels) in center row of chip
slice = img[int(img.shape[0]/2-10):int(img.shape[0]/2+10),:].sum(axis=0)
# use the header info to do rough solution (linear guess)
wtemp = (np.arange(len(slice))-len(slice)/2) * disp_approx * sign + wcen_approx
###### IDENTIFY (auto and interac modes)
# = = = = = = = = = = = = = = = =
#-- automatic mode
if (interac is False):
if (len(previous)==0):
print("Doing automatic wavelength calibration on HeNeAr.")
print("Note, this is not very robust. Suggest you re-run with interac=True")
# find the linelist of choice
linelists_dir = os.path.dirname(os.path.realpath(__file__))+ '/resources/linelists/'
# if (len(linelist)==0):
# linelist = os.path.join(linelists_dir, linelist)
# import the linelist
linewave = np.genfromtxt(os.path.join(linelists_dir, linelist), dtype='float',
skip_header=1,usecols=(0,),unpack=True)
pcent_pix, wcent_pix = find_peaks(wtemp, slice, pwidth=10, pthreshold=97)
# loop thru each peak, from center outwards. a greedy solution
# find nearest list line. if not line within tolerance, then skip peak
pcent = []
wcent = []
# find center-most lines, sort by dist from center pixels
ss = np.argsort(np.abs(wcent_pix-wcen_approx))
#coeff = [0.0, 0.0, disp_approx*sign, wcen_approx]
coeff = np.append(np.zeros(fit_order-1),(disp_approx*sign, wcen_approx))
for i in range(len(pcent_pix)):
xx = pcent_pix-len(slice)/2
#wcent_pix = coeff[3] + xx * coeff[2] + coeff[1] * (xx*xx) + coeff[0] * (xx*xx*xx)
wcent_pix = np.polyval(coeff, xx)
if display is True:
plt.figure()
plt.plot(wtemp, slice, 'b')
plt.scatter(linewave,np.ones_like(linewave)*np.nanmax(slice),marker='o',c='cyan')
plt.scatter(wcent_pix,np.ones_like(wcent_pix)*np.nanmax(slice)/2.,marker='*',c='green')
plt.scatter(wcent_pix[ss[i]],np.nanmax(slice)/2., marker='o',c='orange')
# if there is a match w/i the linear tolerance
if (min((np.abs(wcent_pix[ss][i] - linewave))) < tol):
# add corresponding pixel and *actual* wavelength to output vectors
pcent = np.append(pcent,pcent_pix[ss[i]])
wcent = np.append(wcent, linewave[np.argmin(np.abs(wcent_pix[ss[i]] - linewave))] )
if display is True:
plt.scatter(wcent,np.ones_like(wcent)*np.nanmax(slice),marker='o',c='red')
if (len(pcent)>fit_order):
coeff = np.polyfit(pcent-len(slice)/2, wcent, fit_order)
if display is True:
plt.xlim((min(wtemp),max(wtemp)))
plt.show()
lout = open(calimage+'.lines', 'w')
lout.write("# This file contains the HeNeAr lines identified [auto] Columns: (pixel, wavelength) \n")
for l in range(len(pcent)):
lout.write(str(pcent[l]) + ', ' + str(wcent[l])+'\n')
lout.close()
# the end result is the vector "coeff" has the wavelength solution for "slice"
# update the "wtemp" vector that goes with "slice" (fluxes)
wtemp = np.polyval(coeff, (np.arange(len(slice))-len(slice)/2))
if (len(previous) > 0):
pcent, wcent = np.genfromtxt(previous, dtype='float',
unpack=True, skip_header=1, delimiter=',')
# = = = = = = = = = = = = = = = =
#-- manual (interactive) mode
elif (interac is True):
if (len(previous)==0):
print('')
print('Using INTERACTIVE HeNeAr_fit mode:')
print('1) Click on HeNeAr lines in plot window')
print('2) Enter corresponding wavelength in terminal and press <return>')
print(' If mis-click or unsure, just press leave blank and press <return>')
print('3) To delete an entry, click on label, enter "d" in terminal, press <return>')
print('4) Close plot window when finished')
xraw = np.arange(len(slice))
class InteracWave(object):
# http://stackoverflow.com/questions/21688420/callbacks-for-graphical-mouse-input-how-to-refresh-graphics-how-to-tell-matpl
def __init__(self):
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111)
self.ax.plot(wtemp, slice, 'b')
plt.xlabel('Wavelength')
plt.ylabel('Counts')
self.pcent = [] # the pixel centers of the identified lines
self.wcent = [] # the labeled wavelengths of the lines
self.ixlib = [] # library of click points
self.cursor = Cursor(self.ax, useblit=False,horizOn=False,
color='red', linewidth=1 )
self.connect = self.fig.canvas.mpl_connect
self.disconnect = self.fig.canvas.mpl_disconnect
self.clickCid = self.connect("button_press_event",self.OnClick)
def OnClick(self, event):
# only do stuff if toolbar not being used
# NOTE: this subject to change API, so if breaks, this probably why
# http://stackoverflow.com/questions/20711148/ignore-matplotlib-cursor-widget-when-toolbar-widget-selected
if self.fig.canvas.manager.toolbar._active is None:
ix = event.xdata
# if the click is in good space, proceed
if (ix is not None) and (ix > np.nanmin(slice)) and (ix < np.nanmax(slice)):
# disable button event connection
self.disconnect(self.clickCid)
# disconnect cursor, and remove from plot
self.cursor.disconnect_events()
self.cursor._update()
# get points nearby to the click
nearby = np.where((wtemp > ix-10*disp_approx) &
(wtemp < ix+10*disp_approx) )
# find if click is too close to an existing click (overlap)
kill = None
if len(self.pcent)>0:
for k in range(len(self.pcent)):
if np.abs(self.ixlib[k]-ix)<tol:
kill_d = input('> WARNING: Click too close to existing point. To delete existing point, enter "d"')
if kill_d=='d':
kill = k
if kill is not None:
del(self.pcent[kill])
del(self.wcent[kill])
del(self.ixlib[kill])
# If there are enough valid points to possibly fit a peak too...
if (len(nearby[0]) > 4) and (kill is None):
imax = np.nanargmax(slice[nearby])
pguess = (np.nanmax(slice[nearby]), np.nanmedian(slice), xraw[nearby][imax], 2.)
try:
popt,pcov = curve_fit(_gaus, xraw[nearby], slice[nearby], p0=pguess)
self.ax.plot(wtemp[int(popt[2])], popt[0], 'r|')
except ValueError:
print('> WARNING: Bad data near this click, cannot centroid line with Gaussian. I suggest you skip this one')
popt = pguess
except RuntimeError:
print('> WARNING: Gaussian centroid on line could not converge. I suggest you skip this one')
popt = pguess
# using raw_input sucks b/c doesn't raise terminal, but works for now
try:
number=float(input('> Enter Wavelength: '))
self.pcent.append(popt[2])
self.wcent.append(number)
self.ixlib.append((ix))
self.ax.plot(wtemp[int(popt[2])], popt[0], 'ro')
print(' Saving '+str(number))
except ValueError:
print("> Warning: Not a valid wavelength float!")
elif (kill is None):
print('> Error: No valid data near click!')
# reconnect to cursor and button event
self.clickCid = self.connect("button_press_event",self.OnClick)
self.cursor = Cursor(self.ax, useblit=False,horizOn=False,
color='red', linewidth=1 )
else:
pass
# run the interactive program
wavefit = InteracWave()
plt.show() #activate the display - GO!
# how I would LIKE to do this interactively:
# inside the interac mode, do a split panel, live-updated with
# the wavelength solution, and where user can edit the fit_order
# how I WILL do it instead
# a crude while loop here, just to get things moving
# after interactive fitting done, get results fit peaks
pcent = np.array(wavefit.pcent,dtype='float')
wcent = np.array(wavefit.wcent, dtype='float')
print('> You have identified '+str(len(pcent))+' lines')
lout = open(calimage+'.lines', 'w')
lout.write("# This file contains the HeNeAr lines identified [manual] Columns: (pixel, wavelength) \n")
for l in range(len(pcent)):
lout.write(str(pcent[l]) + ', ' + str(wcent[l])+'\n')
lout.close()
if (len(previous)>0):
pcent, wcent = np.genfromtxt(previous, dtype='float',
unpack=True, skip_header=1,delimiter=',')
#--- FIT SMOOTH FUNCTION ---
# fit polynomial thru the peak wavelengths
# xpix = (np.arange(len(slice))-len(slice)/2)
# coeff = np.polyfit(pcent-len(slice)/2, wcent, fit_order)
xpix = np.arange(len(slice))
coeff = np.polyfit(pcent, wcent, fit_order)
wtemp = np.polyval(coeff, xpix)
done = str(fit_order)
while (done != 'd'):
fit_order = int(done)
coeff = np.polyfit(pcent, wcent, fit_order)
wtemp = np.polyval(coeff, xpix)
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
ax1.plot(pcent, wcent, 'bo')
ax1.plot(xpix, wtemp, 'r')
ax2.plot(pcent, wcent - np.polyval(coeff, pcent),'ro')
ax2.set_xlabel('pixel')
ax1.set_ylabel('wavelength')
ax2.set_ylabel('residual')
ax1.set_title('fit_order = '+str(fit_order))
# ylabel('wavelength')
print(" ")
print('> How does this look? Enter "d" to be done (accept), ')
print(' or a number to change the polynomial order and re-fit')
print('> Currently fit_order = '+str(fit_order))
print(" ")
plt.show()
_CheckMono(wtemp)
print(' ')
done = str(input('ENTER: "d" (done) or a # (poly order): '))
# = = = = = = = = = = = = = = = = = =
# now rough wavelength is found, either via interactive or auto mode!
#-- SECOND PASS
if second_pass is True:
linelists_dir = os.path.dirname(os.path.realpath(__file__))+ '/resources/linelists/'
hireslinelist = 'henear.dat'
linewave2 = np.genfromtxt(os.path.join(linelists_dir, hireslinelist), dtype='float',
skip_header=1, usecols=(0,), unpack=True)
tol2 = tol # / 2.0
pcent_pix2, wcent_pix2 = find_peaks(wtemp, slice, pwidth=10, pthreshold=80)
pcent2 = []
wcent2 = []
# sort from center wavelength out
ss = np.argsort(np.abs(wcent_pix2-wcen_approx))
# coeff should already be set by manual or interac mode above
# coeff = np.append(np.zeros(fit_order-1),(disp_approx*sign, wcen_approx))
for i in range(len(pcent_pix2)):
xx = pcent_pix2-len(slice)/2
wcent_pix2 = np.polyval(coeff, xx)
if (min((np.abs(wcent_pix2[ss][i] - linewave2))) < tol2):
# add corresponding pixel and *actual* wavelength to output vectors
pcent2 = np.append(pcent2, pcent_pix2[ss[i]])
wcent2 = np.append(wcent2, linewave2[np.argmin(np.abs(wcent_pix2[ss[i]] - linewave2))] )
#-- update in real time. maybe not good for 2nd pass
# if (len(pcent2)>fit_order):
# coeff = np.polyfit(pcent2-len(slice)/2, wcent2, fit_order)
if display is True:
plt.figure()
plt.plot(wtemp, slice, 'b')
plt.scatter(linewave2,np.ones_like(linewave2)*np.nanmax(slice),
marker='o',c='cyan')
plt.scatter(wcent_pix2,np.ones_like(wcent_pix2)*np.nanmax(slice)/2.,
marker='*',c='green')
plt.scatter(wcent_pix2[ss[i]],np.nanmax(slice)/2.,
marker='o',c='orange')
plt.text(np.nanmin(wcent_pix2), np.nanmax(slice)*0.95, hireslinelist)
plt.text(np.nanmin(wcent_pix2), np.nanmax(slice)/2.*1.1, 'detected lines')
plt.scatter(wcent2,np.ones_like(wcent2)*np.nanmax(slice)*1.05,marker='o',c='red')
plt.text(np.nanmin(wcent_pix2), np.nanmax(slice)*1.1, 'matched lines')
plt.ylim((np.nanmin(slice), np.nanmax(slice)*1.2))
plt.xlim((min(wtemp),max(wtemp)))
plt.show()
wtemp = np.polyval(coeff, (np.arange(len(slice))-len(slice)/2))
lout = open(calimage+'.lines2', 'w')
lout.write("# This file contains the HeNeAr lines identified [2nd pass] Columns: (pixel, wavelength) \n")
for l in range(len(pcent2)):
lout.write(str(pcent2[l]) + ', ' + str(wcent2[l])+'\n')
lout.close()
xpix = np.arange(len(slice))
coeff = np.polyfit(pcent2, wcent2, fit_order)
wtemp = np.polyval(coeff, xpix)
#--- FIT SMOOTH FUNCTION ---
if interac is True:
done = str(fit_order)
while (done != 'd'):
fit_order = int(done)
coeff = np.polyfit(pcent2, wcent2, fit_order)
wtemp = np.polyval(coeff, xpix)
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
ax1.plot(pcent2, wcent2, 'bo')
ax1.plot(xpix, wtemp, 'r')
ax2.plot(pcent2, wcent2 - np.polyval(coeff, pcent2),'ro')
ax2.set_xlabel('pixel')
ax1.set_ylabel('wavelength')
ax2.set_ylabel('residual')
ax1.set_title('2nd pass, fit_order = '+str(fit_order))
# ylabel('wavelength')
print(" ")
print('> How does this look? Enter "d" to be done (accept), ')
print(' or a number to change the polynomial order and re-fit')
print('> Currently fit_order = '+str(fit_order))
print(" ")
plt.show()
_CheckMono(wtemp)
print(' ')
done = str(input('ENTER: "d" (done) or a # (poly order): '))
#-- trace the peaks vertically --
xcent_big, ycent_big, wcent_big = line_trace(img, pcent, wcent,
fmask=fmask, display=display)
#-- turn these vertical traces in to a whole chip wavelength solution
wfit = lines_to_surface(img, xcent_big, ycent_big, wcent_big,
mode=mode, fit_order=fit_order)
return wfit
[docs]def mapwavelength(trace, wavemap, mode='spline2d'):
"""
Compute the wavelength along the center of the trace, to be run after
the HeNeAr_fit routine.
Parameters
----------
trace : 1-d array
The spatial positions (Y axis) corresponding to the center of the
trace for every wavelength (X axis), as returned from ap_trace
wavemap : bivariate spline object or image-like wavelength map
The wavelength evaluated at every pixel, output from HeNeAr_fit
Type depends on mode parameter.
mode : str, optional
Which mode was used to generate the 2D wavelength solution in
HeNeAr_fit(), and specifically in lines_to_surface()?
Options include: poly, spline, spline2d (Default is 'spline2d')
Returns
-------
trace_wave : 1d array
The wavelength vector evaluated at each position along the trace
"""
# use the wavemap from the HeNeAr_fit routine to determine the wavelength along the trace
if mode=='spline2d':
trace_wave = wavemap.ev(np.arange(len(trace)), trace)
elif mode=='poly' or mode=='spline':
trace_wave = np.zeros_like(trace)
for i in range(len(trace)):
trace_wave[i] = np.interp(trace[i], range(wavemap.shape[0]), wavemap[:,i])
## using 2d polyfit
# trace_wave = polyval2d(np.arange(len(trace)), trace, wavemap)
return trace_wave
[docs]def normalize(wave, flux, mode='poly', order=5):
'''
Return a flattened, normalized spectrum. A model spectrum is made of
the continuum by fitting either a polynomial or spline to the data,
and then the data is normalized with the equation:
>>> norm = (flux - model) / model # doctest: +SKIP
Parameters
----------
wave : 1-d array
The object's wavelength array
flux : 1-d array
The object's flux array
mode : str, optional
Decides which mode should be used to flatten the spectrum.
Options are 'poly' (Default), 'spline', 'interac'.
order : int, optional
The polynomial order to use for mode='poly'. (Default is 3)
Returns
-------
Flux normalized spectrum at same wavelength points as the input
'''
if (mode != 'interac') and (mode != 'spline') and (mode != 'poly'):
mode = 'poly'
print("WARNING: invalid mode set in normalize. Changing to 'poly'")
if mode=='interac':
print('interac mode not built yet. sorry...')
mode = 'poly'
if mode=='poly':
fit = np.polyfit(wave, flux, order)
model = np.polyval(fit, wave)
if mode=='spline':
spl = UnivariateSpline(wave, flux, ext=0, k=2 ,s=0.0025)
model = spl(wave)
norm = (flux - model) / model
return norm
[docs]def AirmassCor(obj_wave, obj_flux, airmass, airmass_file='apoextinct.dat'):
"""
Correct the spectrum based on the airmass
Parameters
----------
obj_wave : 1-d array
The 1-d wavelength array of the spectrum
obj_flux : 1-d or 2-d array
The 1-d or 2-d flux array of the spectrum
airmass : float
The value of the airmass, not the header keyword.
airmass_file : str, {'apoextinct.dat', 'ctioextinct.dat', 'kpnoextinct.dat', 'ormextinct.dat'}
The name of the airmass extinction file. This routine assumes
the file is stored in the resources/extinction/ subdirectory.
Available files are (Default is apoextinct.dat)
Returns
-------
The flux array
"""
# read in the airmass extinction curve
extinction_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'resources/extinction')
if len(airmass_file)==0:
air_wave, air_cor = np.genfromtxt(os.path.join(extinction_dir, airmass_file),
unpack=True,skip_header=2)
else:
print('> Loading airmass library file: '+airmass_file)
# print(' Note: first 2 rows are skipped, assuming header')
air_wave, air_cor = np.genfromtxt(os.path.join(extinction_dir, airmass_file),
unpack=True,skip_header=2)
# air_cor in units of mag/airmass
airmass_ext = 10.0**(0.4 * airmass *
np.interp(obj_wave, air_wave, air_cor))
# arimas_ext is broadcast to obj_flux if it is a 2-d array
return obj_flux * airmass_ext
[docs]def DefFluxCal(obj_wave, obj_flux, stdstar='', mode='spline', polydeg=9,
display=False):
"""
Parameters
----------
obj_wave : 1-d array
The 1-d wavelength array of the spectrum
obj_flux : 1-d array
The 1-d flux array of the spectrum
stdstar : str
Name of the standard star file to use for flux calibration. You
must give the subdirectory and file name, for example:
>>> sensfunc = DefFluxCal(wave, flux, mode='spline', stdstar='spec50cal/feige34.dat') # doctest: +SKIP
If no standard is set, or an invalid standard is selected, will
return array of 1's and a warning. A list of all available
subdirectories and objects is available on the wiki, or look in
pydis/resources/onedstds/
mode : str, optional
either "linear", "spline", or "poly" (Default is spline)
polydeg : float, optional
set the order of the polynomial to fit through (Default is 9)
display : bool, optional
If True, plot the down-sampled sensfunc and fit to screen (Default
is False)
Returns
-------
sensfunc : 1-d array
The sensitivity function for the standard star
"""
stdstar2 = stdstar.lower()
std_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'resources', 'onedstds')
if os.path.isfile(os.path.join(std_dir, stdstar2)):
std_wave, std_mag, std_wth = np.genfromtxt(os.path.join(std_dir, stdstar2),
skip_header=1, unpack=True)
# standard star spectrum is stored in magnitude units
std_flux = _mag2flux(std_wave, std_mag)
# Automatically exclude these obnoxious lines...
balmer = np.array([6563, 4861, 4341], dtype='float')
# down-sample (ds) the observed flux to the standard's bins
obj_flux_ds = []
obj_wave_ds = []
std_flux_ds = []
for i in range(len(std_wave)):
rng = np.where((obj_wave >= std_wave[i] - std_wth[i] / 2.0) &
(obj_wave < std_wave[i] + std_wth[i] / 2.0))
IsH = np.where((balmer >= std_wave[i] - std_wth[i] / 2.0) &
(balmer < std_wave[i] + std_wth[i] / 2.0))
# does this bin contain observed spectra, and no Balmer line?
if (len(rng[0]) > 1) and (len(IsH[0]) == 0):
# obj_flux_ds.append(np.sum(obj_flux[rng]) / std_wth[i])
obj_flux_ds.append( np.nanmean(obj_flux[rng]) )
obj_wave_ds.append(std_wave[i])
std_flux_ds.append(std_flux[i])
# the ratio between the standard star flux and observed flux
# has units like erg / counts
ratio = np.abs(np.array(std_flux_ds, dtype='float') /
np.array(obj_flux_ds, dtype='float'))
# interp calibration (sensfunc) on to object's wave grid
# can use 3 types of interpolations: linear, cubic spline, polynomial
# if invalid mode selected, make it spline
if mode not in ('linear', 'spline', 'poly'):
mode = 'spline'
print("WARNING: invalid mode set in DefFluxCal. Changing to spline")
# actually fit the log of this sensfunc ratio
# since IRAF does the 2.5*log(ratio), everything in mag units!
LogSensfunc = np.log10(ratio)
# interpolate back on to observed wavelength grid
if mode=='linear':
sensfunc2 = np.interp(obj_wave, obj_wave_ds, LogSensfunc)
elif mode=='spline':
spl = UnivariateSpline(obj_wave_ds, LogSensfunc, ext=0, k=2 ,s=0.0025)
sensfunc2 = spl(obj_wave)
elif mode=='poly':
fit = np.polyfit(obj_wave_ds, LogSensfunc, polydeg)
sensfunc2 = np.polyval(fit, obj_wave)
if display is True:
plt.figure()
plt.plot(std_wave, std_flux, 'r', alpha=0.5, label='standard flux')
plt.xlabel('Wavelength')
plt.ylabel('Standard Star Flux')
plt.legend()
plt.show()
plt.figure()
plt.plot(obj_wave, obj_flux, 'k', label='observed counts')
plt.plot(obj_wave_ds, obj_flux_ds, 'bo',
label='downsample observed')
plt.xlabel('Wavelength')
plt.ylabel('Observed Counts/S')
plt.legend()
plt.show()
plt.figure()
plt.plot(obj_wave_ds, LogSensfunc, 'ko', label='sensfunc')
plt.plot(obj_wave, sensfunc2, label='interpolated sensfunc')
plt.xlabel('Wavelength')
plt.ylabel('log Sensfunc')
plt.legend()
plt.show()
plt.figure()
plt.plot(obj_wave, obj_flux*(10**sensfunc2),'k',
label='applied sensfunc')
plt.plot(std_wave, std_flux, 'ro', alpha=0.5, label='standard flux')
plt.xlabel('Wavelength')
plt.ylabel('Standard Star Flux')
plt.legend()
plt.show()
else:
sensfunc2 = np.zeros_like(obj_wave)
print('ERROR: in DefFluxCal no valid standard star file found at ')
print(os.path.join(std_dir, stdstar2))
return 10**sensfunc2
[docs]def ApplyFluxCal(obj_wave, obj_flux, obj_err, cal_wave, sensfunc):
# the sensfunc should already be BASICALLY at the same wavelenths as the targets
# BUT, just in case, we linearly resample it:
# ensure input array is sorted!
ss = np.argsort(cal_wave)
sensfunc2 = np.interp(obj_wave, cal_wave[ss], sensfunc[ss])
# then simply apply re-sampled sensfunc to target flux
return obj_flux * sensfunc2, obj_err * sensfunc2