__all__ = [
"smooth",
"envelope",
"normalize_orders",
"bin",
"bin_avg",
"convolve",
"derivative",
"constant_velocity_wl_grid",
"blur_rotate",
"airtovac",
"vactoair",
"clean_block"
]
import numpy as np
[docs]def smooth(fx,w,mode='box',edge_degree=1):
"""
This function takes a spectrum, and blurs it using either a
Gaussian kernel or a box kernel, which have a FWHM width of w px everywhere.
Meaning that the width changes dynamically on a constant d-lambda grid.
Set the mode to gaussian or box. Because in box, care is taken to correctly
interpolate the edges, it is about twice slower than the Gaussian.
This interpolation is done manually in the fun.box function.
This function assumes that there are no NaN values in the flux array, but doesn't presently
test for this.
"""
import numpy as np
import tayph.util as ut
from tayph.vartests import typetest,postest, nantest
import tayph.functions as fun
from matplotlib import pyplot as plt
typetest(w,[int,float],'w in ops.smooth()')
typetest(fx,np.ndarray,'fx in ops.smooth()')
typetest(mode,str,'mode in ops.smooth()')
typetest(edge_degree,int,'edge_degree in ops.smooth()')
postest(w,'w in ops.smooth()')
nantest(fx,'fx in ops.smooth()')
nantest(w,'w in ops.smooth()')
mode=mode.lower()
if mode not in ['box','gaussian']:
raise Exception(f'RuntimeError in ops.smooth(): Mode should be set to "box" or "gaussian" ({mode}).')
truncsize=4.0#The gaussian is truncated at 4 sigma.
shape=np.shape(fx)
sig_w = w / 2*np.sqrt(2.0*np.log(2)) #Transform FWHM to Gaussian sigma. In km/s.
trunc_dist=np.round(sig_w*truncsize).astype(int)
#First define the kernel.
kw=int(np.round(truncsize*sig_w*2.0))
if kw % 2.0 != 1.0:#This is to make sure that the kernel has an odd number of
#elements, and that it is symmetric around zero.
kw+=1
kx=np.arange(kw,dtype=float) #fun.findgen(kw)
kx-=np.mean(kx)#This must be centered around zero. Doing a hardcoded check:
if (-1.0)*kx[-1] != kx[0]:
print(kx)
raise Exception("ERROR in box_smooth: Kernel could not be made symmetric somehow. Attempted kernel grid is printed above. Kernel width is %s pixels." % kw)
if mode == 'gaussian':
k=fun.gaussian(kx,1.0,0.0,sig_w)
if mode == 'box':
k=fun.box(kx,1.0,0.0,w)
kx=kx[k > 0.0]
k=k[k > 0.0]
if (-1.0)*kx[-1] != kx[0]:
print(kx)
raise Exception("ERROR in box_smooth: Kernel could not be made symmetric AFTER CROPPING OUT THE BOX, somehow. Attempted kernel grid is printed above. Kernel width is %s pixels." % kw)
k/=np.sum(k)
return(convolve(fx,k,edge_degree))
[docs]def envelope(wlm,fxm,binsize,selfrac=0.05,mode='top',threshold=''):
"""
This routine measures the top or bottom envelope of a spectrum (wl,fx), by
chopping it up into bins of size binsze (unit of wl), and measuring the mean
of the top n-% of values in that bin. Setting the mode to 'bottom' will do the
oppiste: The mean of the bottom n-% of values. The output is the resulting wl
and flux points of these bins.
Parameters
----------
wlm : np.ndarray
The wavelength axis.
fxm : np.ndarray
The flux axis.
binsize : int, float
The width of the bin within which to measure the continuum, in units of wavelength.
selfrac: float
The top fraction of values in each bin to be used to measure the continuum. Default 0.05.
mode : str
The direction in which the envelope is measured. Can be set to "top" or "bottom".
Default "top".
Returns
-------
wle : np.array
The wavelength points associated with the envelope.
fxe : np.array
The corresponding flux points describing the envelope.
Example
-------
>>> wle,fxe = envelope(wl,fx,1.0,selfrac=0.03,mode='top')
"""
import numpy as np
import tayph.util as ut
import tayph.functions as fun
from tayph.vartests import typetest,dimtest,nantest,postest
from matplotlib import pyplot as plt
typetest(wlm,np.ndarray,'wlm in ops.envelope()')
typetest(fxm,np.ndarray,'fxm in ops.envelope()')
dimtest(wlm,[len(fxm)])
typetest(binsize,[int,float],'binsize in ops.envelope()')
typetest(selfrac,float,'percentage in ops.envelope()')
typetest(mode,str,'mode in envelope')
nantest(fxm,'fxm in ops.envelope()')
nantest(wlm,'wlm in ops.envelope()')
postest(wlm,'wlm in ops.envelope()')
postest(binsize,'binsize in ops.envelope()')
if mode not in ['top','bottom']:
raise Exception(f'RuntimeError in ops.envelope(): Mode should be set to "top" or "bottom" ({mode}).')
binsize=float(binsize)
if mode == 'bottom':
fxm*=-1.0
wlcs=np.array([])#Center wavelengths
fxcs=np.array([])#Flux at center wavelengths
i_start=0
wlm_start=wlm[i_start]
for i in range(0,len(wlm)-1):
if wlm[i]-wlm_start >= binsize:
wlsel = wlm[i_start:i]
fxsel = fxm[i_start:i]
maxsel = fun.selmax(fxsel,selfrac)
wlcs=np.append(wlcs,np.mean(wlsel[maxsel]))
fxcs=np.append(fxcs,np.mean(fxsel[maxsel]))
i_start=i+1
wlm_start=wlm[i+1]
if isinstance(threshold,float) == True:
#This means that the threshold value is set, and we set all bins less than
#that threshold to the threshold value:
if mode == 'bottom':
threshold*=-1.0
fxcs[(fxcs < threshold)] = threshold
if mode == 'bottom':
fxcs*=-1.0
fxm*=-1.0
return wlcs,fxcs
[docs]def normalize_orders(list_of_orders,list_of_sigmas,deg=0,nsigma=4,sinusoid=False):
"""
If deg is set to 1, this function will normalise based on the mean flux in each order.
If set higher, it will remove the average spectrum in each order and fit a polynomial
to the residual. This means that in the presence of spectral lines, the fluxes will be
slightly lower than if def=1 is used. nsigma is only used if deg > 1, and is used to
throw away outliers from the polynomial fit. The program also computes the total
mean flux of each exposure in the time series - totalled over all orders. These
are important to correctly weigh the cross-correlation functions later. The
inter-order colour correction is assumed to be an insignificant modification to
these weights.
Parameters
----------
list_of_orders : list
The list of 2D orders that need to be normalised.
list_of_sigmas : list
The list of 2D error matrices corresponding to the 2D orders that need to be normalised.
deg : int
The polynomial degree to remove. If set to 0, only the average flux is removed. If higher,
polynomial fits are made to the residuals after removal of the average spectrum.
nsigma : int, float
The number of sigmas beyond which outliers are rejected from the polynomial fit.
Only used when deg > 1.
Returns
-------
out_list_of_orders : list
The normalised 2D orders.
out_list_of_sigmas : list
The corresponding errors.
t_weights : np.array
Weight based on the mean flux of each exposure in the time series, averaged over all orders.
"""
import numpy as np
import tayph.functions as fun
from tayph.vartests import dimtest,postest,typetest,notnegativetest
import tayph.util as ut
import warnings
import pdb
typetest(list_of_orders,list,'list_of_orders in ops.normalize_orders()')
typetest(list_of_sigmas,list,'list_of_sigmas in ops.normalize_orders()')
dimtest(list_of_orders[0],[0,0])#Test that the first order is 2D.
dimtest(list_of_sigmas[0],[0,0])#And that the corresponding sigma array is, as well.
n_exp=np.shape(list_of_orders[0])[0]#Get the number of exposures.
for i in range(len(list_of_orders)):#Should be the same for all orders.
dimtest(list_of_orders[i],[n_exp,0])
dimtest(list_of_sigmas[i],np.shape(list_of_orders[i]))
typetest(deg,int,'degree in ops.normalize_orders()')
typetest(nsigma,[int,float],'nsigma in ops.normalize_orders()')
postest(nsigma,'nsigma in ops.normalize_orders()')
notnegativetest(deg,'degree in ops.normalize_orders()')
N = len(list_of_orders)
out_list_of_orders=[]
out_list_of_sigmas=[]
#First compute the exposure-to-exposure flux variations to be used as weights.
t_weights = np.zeros(n_exp)
### suggestions for improvement
"""
m = [np.nanmedian(list_of_orders[i], axis=1) for i in range(N)]
skipped = np.where(np.sum(np.isnan(m)) > 0)[0]
t_weights = np.sum(m[np.sum(np.isnan(m)) <= 0]) / len(m[np.sum(np.isnan(m)) <= 0])
"""
N_i = 0
meanflux = []
for i in range(N):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
m = np.nanmedian(list_of_orders[i],axis=1)#Median or mean? This returns a NaN for each
#row that is all NaNs.
if np.sum(np.isnan(m)) > 0:
ut.tprint(f'---Warning in normalise_orders: Skipping order {i} when calculating total '
'flux in the time-series because many nans are present.')
else:
N_i+=1
t_weights+=m#These contain the exposure-to-exposure variability of the time-series.
#These have length equal to n_exp.
t_weights/=N_i#These are the weights.
if deg == 0:
for i in range(N):
meanflux=np.nanmedian(list_of_orders[i],axis=1) #Average flux in each order as above.
#Median or mean? If this is all NaN for some exposure, then in the out_order this will
#be all NaN, too. Then it should be skipped?
out_list_of_orders.append((list_of_orders[i].T/(meanflux/np.nanmean(meanflux))).T)
out_list_of_sigmas.append((list_of_sigmas[i].T/(meanflux/np.nanmean(meanflux))).T)
else:
for i in range(N):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
meanspec=np.nanmean(list_of_orders[i],axis=0)#Average spectrum in each order.
x=np.array(range(len(meanspec)))
poly_block = list_of_orders[i]*0.0#Array that will host the polynomial fits.
meanspec[meanspec<=0]=np.nan#If there are zeroes, these are set to NaN.
colour = list_of_orders[i]/meanspec#NaNs propagate into here and are ignored below.
for j,s in enumerate(list_of_orders[i]):
idx = np.isfinite(colour[j])
if np.sum(idx) > 0:
p = np.poly1d(np.polyfit(x[idx],colour[j][idx],deg))(x)#Polynomial fit to the
#colour variation.
res = colour[j]/p-1.0 #The residual, which is flat around zero if it's a good
#fit. This has all sorts of line residuals that we need to throw out. We do
#that using the weight keyword of polyfit, and just set all those weights to
#zero.
sigma = np.nanstd(res)
w = x*0.0+1.0#Start with a weight function that is 1.0 everywhere.
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
w[np.abs(res)>nsigma*sigma] = 0.0
if sinusoid == False:
p2 = np.poly1d(np.polyfit(x[idx],colour[j][idx],deg,w=w[idx]))(x)#Second, weighted polynomial fit to the colour variation.
else:
p2 = fun.polysin(x,*fun.polysinfit(x[idx],colour[j][idx],deg,stepsize=1,polyprimer=True,lmfit=True,w=w[idx]))
poly_block[j] = p2
out_list_of_orders.append(list_of_orders[i]/poly_block)
out_list_of_sigmas.append(list_of_sigmas[i]/poly_block)
ut.statusbar(i,N)
return(out_list_of_orders,out_list_of_sigmas,t_weights)
[docs]def bin(x,y,n,err=[]):
"""
A simple function to quickly bin a spectrum by a certain number of points.
Parameters
----------
x : list, np.ndarray
The horizontal axis.
y : list, np.ndarray
The array corresponding to x.
n : int, float
The number of points by which to bin. Int is recommended, and it is converted
to int if a float is provided, with the consequence of the rounding that int() does.
err : list, np.array, optional
If set, errors corresponding to the flux points.
Returns
-------
x_bin : np.array
The binned x-axis.
y_bin : np.array
The binned y-axis.
e_bin : np.array
Only if err is set to an array with non-zero length.
"""
from tayph.vartests import typetest,dimtest,minlength
import numpy as np
typetest(x,[list,np.ndarray],'x in ops.bin()')
typetest(y,[list,np.ndarray],'y in ops.bin()')
typetest(n,[int,float],'n in ops.bin()')
dimtest(x,[0],'x in ops.bin()')
dimtest(y,[len(x)],'y in ops.bin()')
minlength(x,0,'x in ops.bin()')
minlength(x,100,'x in ops.bin()',warning_only=True)
x_bin=[]
y_bin=[]
et=False
if len(err) > 0:
e_bin = []
et = True
for i in range(0,int(len(x)-n-1),n):
x_bin.append(np.nanmean(x[i:i+n]))
y_bin.append(np.nanmean(y[i:i+n]))
if et:
e_bin.append(np.sqrt(np.nansum(err[i:i+n]**2))/len(err[i:i+n]))
if et:
return(np.array(x_bin),np.array(y_bin),np.array(e_bin))
else:
return(np.array(x_bin),np.array(y_bin))
[docs]def bin_avg(wl,fx,wlm):
"""
A simple function to bin a high-res model spectrum (wl,fx) to a lower resolution wavelength grid (wlm),
by averaging the points fx inside the lower-resolution wavelength bins set by wlm.
Parameters
----------
wl : list, np.ndarray
The horizontal axis.
fx : list, np.ndarray
The array corresponding to x.
wlm : list, np.ndarray
The lower resolution wlm array. wlm should have fewer points than wl and fx.
Returns
-------
y_bin : np.array
The binned flux points.
"""
import numpy as np
import sys
from tayph.vartests import typetest,dimtest,minlength
import matplotlib.pyplot as plt
import tayph.util as ut
typetest(wl,[list,np.ndarray],'wl in ops.bin_avg()')
typetest(fx,[list,np.ndarray],'fx in ops.bin_avg()')
typetest(wlm,[list,np.ndarray],'wlm in ops.bin_avg()')
dimtest(wl,[0],'wl in ops.bin_avg()')
dimtest(fx,[len(fx)],'fx in ops.bin_avg()')
minlength(wl,0,'wl in ops.bin_avg()')
minlength(wl,50,'wl in ops.bin_avg()',warning_only=True)
minlength(wlm,0,'wlm in ops.bin_avg()')
minlength(wlm,100,'wlm in ops.bin_avg()',warning_only=True)
if len(wl) < len(wlm):
raise ValueError(f"Error in bin_avg: The input grid should have (many) more points than the requested grid of interpolates ({len(wl)}, {len(wlm)}).")
if max(wl) < min(wlm) or min(wl) > max(wlm):
raise ValueError(f"Error in bin_avg: the supplied wl and wlm arrays have no overlap. Are the wavelength units the same? (mean(wl)={np.mean(wlm)}, mean(wlm)={np.mean(wlm)}).")
dwl_start = wlm[1]-wlm[0]
wlm_borders=[wlm[0]-dwl_start/2.0]
for i in range(len(wlm)-1):
dwl=wlm[i+1]-wlm[i]
if i == 0:
wlm_borders=[wlm[0]-dwl/2.0]
wlm_borders.append(wlm[i]+dwl/2.0)
wlm_borders.append(wlm[-1]+dwl/2.0)
fxm = []
for i in range(len(wlm_borders)-1):
#This indexing is slow. But I always only need to compute it once!
fx_sel = fx[(wl > wlm_borders[i])&(wl <= wlm_borders[i+1])]
if len(fx_sel) == 0:
raise RuntimeError(f"Error in bin_avg: No points were selected in step {i}. Wlm_borders has a smaller step size than fx?")
else:
fxbin=np.mean(fx[(wl > wlm_borders[i])&(wl <= wlm_borders[i+1])])
fxm.append(fxbin)
return(fxm)
[docs]def convolve(array,kernel,edge_degree=1,fit_width=2):
"""It's unbelievable, but I could not find the python equivalent of IDL's
/edge_truncate keyword, which truncates the kernel at the edge of the convolution.
Therefore, unfortunately, I need to code a convolution operation myself.
Stand by to be slowed down by an order of magnitude #thankspython.
Nope! Because I can just use np.convolve for most of the array, just not the edge...
So the strategy is to extrapolate the edge of the array using a polynomial fit
to the edge elements. By default, I fit over a range that is twice the length of the kernel; but
this value can be modified using the fit_width parameter.
In rare cases, the polynomial fit doesn't converge. In this case, the fit_width is automatically
increased by one (see Issue #99).
This function assumes that array is free of NaN values.
Parameters
----------
array : list, np.ndarray
The horizontal axis.
kernel : list, np.ndarray
The convolution kernel. It is required to have a length that is less than 25% of the size of the array.
edge_degree : int
The polynomial degree by which the array is extrapolated in order to
fit_width : int
The length of the area at the edges of array used to fit the polynomial, in units of the length of the kernel.
Increase this number for small kernels or noisy arrays.
Returns
-------
array_convolved : np.array
The input array convolved with the kernel
Example
-------
>>> import numpy as np
>>> a=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
>>> b=[-0.5,0,0.5]
>>> c=convolve(a,b,edge_degree=1)
"""
import numpy as np
import tayph.functions as fun
from tayph.vartests import typetest,postest,dimtest,nantest
typetest(edge_degree,int,'edge_degree in ops.convolve()')
typetest(fit_width,int,'edge_degree in ops.convolve()')
typetest(array,[list,np.ndarray])
typetest(kernel,[list,np.ndarray])
dimtest(array,[0],'array in ops.convolve()')
dimtest(kernel,[0],'array in ops.convolve()')
postest(edge_degree,'edge_degree in ops.convolve()')
postest(fit_width,'edge_degree in ops.convolve()')
nantest(array,'input array in ops.convolve()')
nantest(kernel,'input kernel in ops.convolve()')
array = np.array(array)
kernel= np.array(kernel)
if len(kernel) >= len(array)/(fit_width*2):
raise Exception(f"Error in ops.convolve(): Kernel length is larger than 1/{fit_width*2} of "
f"the array ({len(kernel)}, {len(array)}). Can't extrapolate over that length to "
"approximate the edges. And you probably don't want to be doing a convolution like that, "
"anyway. Please rerun with a smaller kernel or a wider array. If you are getting this "
"error while cleaning the CCF's (in tayph.ccf.filter_ccf, e.g. during a normal tayph run), "
"increase the RVrange configuration parameter to make this error go away.")
if len(kernel) % 2 != 1:
raise Exception('Error in ops.convolve(): Kernel needs to have an odd number of elements.')
#This should never be triggered in normal Tayph usage, but may happen if you use smooth
#or convolve separately.
#Prepare to perform polynomial fits at the edges.
x= np.arange(len(array), dtype=float)
#Add a try-except block here to catch a rare rank error that may occur. The fit is very simple
#and low-order, so this shouldn't happen, but still. Respond by increasing the fit width to
#dislodge it.
try:
fit_left=np.polyfit(x[0:len(kernel)*fit_width],array[0:len(kernel)*fit_width],edge_degree)
fit_right=np.polyfit(x[-1*fit_width*len(kernel)-1:],array[-1*fit_width*len(kernel)-1:],
edge_degree)
except:
fit_width+=1
if len(kernel) >= len(array)/(fit_width*2):
raise Exception(f"Error in ops.convolve(): Because a rank-error occured in the edge-"
f"extrapolation, the extrapolation fit_width was automatically increased by 1.0 to "
f"{fit_width}. This has made it larger than 1/{fit_width*2} of the array, which is "
"too large. Please rerun with a smaller kernel or a wider array.")
try:
fit_left=np.polyfit(x[0:len(kernel)*fit_width],array[0:len(kernel)*fit_width],
edge_degree)
fit_right=np.polyfit(x[-1*fit_width*len(kernel)-1:],array[-1*fit_width*len(kernel)-1:],
edge_degree)
#IF A RANK-ERROR IS STILL TRIGGERED HERE, MORE MITIGATION WILL BE NEEDED. WE FIT A LINE
#INSTEAD, WITH A DIFFERENT ALGORITHM.
except:
print('------WARNING in ops.convolve(). Edge extrapolation of the array failed twice. '
'Extrapolating the edges with scipy.stats.linregress instead. Is your data looking OK?')
from scipy.stats import linregress
linfit_left = linregress(x[0:len(kernel)*fit_width],array[0:len(kernel)*fit_width])
linfit_right = linregress(x[-1*fit_width*len(kernel)-1:],
array[-1*fit_width*len(kernel)-1:])
fit_left = np.array([linfit_left[0],linfit_left[1]])
fit_right = np.array([linfit_right[0],linfit_right[1]])
#Pad both the x-grid (onto which the polynomial is defined)
#and the data array.
pad= np.arange((len(kernel)-1)/2, dtype=float) #fun.findgen((len(kernel)-1)/2)
left_pad=pad-(len(kernel)-1)/2
right_pad=np.max(x)+pad+1
left_array_pad=np.polyval(fit_left,left_pad)
right_array_pad=np.polyval(fit_right,right_pad)
#Perform the padding.
x_padded = np.append(left_pad , x)
x_padded = np.append(x_padded , right_pad) #Pad the array with the missing elements of the kernel at the edge.
array_padded = np.append(left_array_pad,array)
array_padded = np.append(array_padded,right_array_pad)
#Reverse the kernel because np.convol does that automatically and I don't want that.
#(Imagine doing a derivative with a kernel [-1,0,1] and it gets reversed...)
kr = kernel[::-1]
#The valid keyword effectively undoes the padding, leaving only those values for which the kernel was entirely in the padded array.
#This thus again has length equal to len(array).
return np.convolve(array_padded,kr,'valid')
[docs]def derivative(x):
"""
This computes the simple numerical derivative of x by convolving with kernel [-1,0,1].
Parameters
----------
x : list, np.ndarray
The array from which the derivative is required.
Returns
-------
derivative : np.array
The numerical derivative of x.
Example
-------
>>> import numpy as np
>>> x=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
>>> dx=derivative(x)
"""
import numpy as np
from tayph.vartests import typetest,dimtest
typetest(x,[list,np.ndarray],'x in derivative()')
dimtest(x,[0],'x in derivative()')
x=np.array(x)
d_kernel=np.array([-1,0,1])/2.0
return(convolve(x,d_kernel,fit_width=3))
[docs]def constant_velocity_wl_grid(wl,fx,oversampling=1.0,minmax=[0,np.inf],assume_sorted=False,verbose=False):
"""This function will define a constant-velocity grid that is (optionally)
sampled a number of times finer than the SMALLEST velocity difference that is
currently in the grid.
Example: wl_cv,fx_cv = constant_velocity_wl_grid(wl,fx,oversampling=1.5).
This function is hardcoded to raise an exception if wl or fx contain NaNs,
because interp1d does not handle NaNs.
Parameters
----------
wl : list, np.ndarray
The wavelength array to be resampled.
fx : list, np.ndarray
The flux array to be resampled.
oversampling : float
The factor by which the wavelength array is *minimally* oversampled.
Returns
-------
wl : np.array
The new wavelength grid.
fx : np.array
The interpolated flux values.
a : float
The velocity step in km/s.
"""
import astropy.constants as consts
import numpy as np
import tayph.functions as fun
from tayph.vartests import typetest,nantest,dimtest,postest
from scipy import interpolate
import pdb
import matplotlib.pyplot as plt
import tayph.util as ut
typetest(oversampling,[int,float],'oversampling in constant_velocity_wl_grid()',)
typetest(wl,[list,np.ndarray],'wl in constant_velocity_wl_grid()')
typetest(fx,[list,np.ndarray],'fx in constant_velocity_wl_grid()')
nantest(wl,'wl in in constant_velocity_wl_grid()')
nantest(fx,'fx in constant_velocity_wl_grid()')
dimtest(wl,[0],'wl in constant_velocity_wl_grid()')
dimtest(fx,[len(wl)],'fx in constant_velocity_wl_grid()')
postest(oversampling,'oversampling in constant_velocity_wl_grid()')
oversampling=float(oversampling)
wl=np.array(wl)
fx=np.array(fx)
c=consts.c.to('km/s').value
minwl = np.max([minmax[0],np.min(wl)])
maxwl = np.min([minmax[1],np.max(wl)])
if verbose: ut.tprint(f'Sub-selecting wavelengths to {minwl}-{maxwl}')
wl_sel = wl[(wl>=minwl)&(wl<=maxwl)]
fx_sel = fx[(wl>=minwl)&(wl<=maxwl)]
if verbose: ut.tprint(f'The length of the wl array is now {len(wl_sel)} (was {len(wl)})')
if verbose: ut.tprint('Computing derivative.')
dl=derivative(wl_sel)
dv=dl/wl_sel*c
a=np.min(np.abs(dv)/oversampling)
if verbose: ut.tprint(f'dv = {a*oversampling}')
wl_new=0.0
#The following while loop will define the new pixel grid.
#It starts trying 100,000 points, and if that's not enough to cover the entire
#range from min(wl) to max(wl), it will add 100,000 more; until it's enough.
if verbose: ut.tprint(f'Building up new wavelength array until {maxwl}')
n=len(wl_sel)
while np.max(wl_new) < maxwl:
x=np.arange(n, dtype=float) #fun.findgen(n)
wl_new=np.exp(a/c * x)*minwl
n+=len(wl_sel)
wl_new[0]=minwl#Artificially set to zero to avoid making a small round
#off error in that exponent.
#Then at the end we crop the part that goes too far:
wl_new_cropped=wl_new[(wl_new < np.max(wl_sel))&(wl_new>np.min(wl_sel))]
x_cropped=x[(wl_new < maxwl)]
if verbose:
ut.tprint(f'Interpolating {len(wl_new_cropped)} values onto the selected '
f'wavelength array with length {len(wl_sel)}')
ut.tprint(f'Boundaries of new wavelength: {np.min(wl_new_cropped)}-{np.max(wl_new_cropped)}')
ut.tprint(f'Boundaries of selected wavelengths: {np.min(wl_sel)}-{np.max(wl_sel)}')
i_fx = interpolate.interp1d(wl_sel,fx_sel,assume_sorted=assume_sorted)
fx_new_cropped =i_fx(wl_new_cropped)
return(wl_new_cropped,fx_new_cropped,a)
[docs]def blur_rotate(wl,order,dv,Rp,P,inclination,status=False,fast=False):
"""This function takes a spectrum and blurs it using a rotation x Gaussian
kernel which has a FWHM width of dv km/s everywhere. Meaning that its width changes
dynamically.
Because the kernel needs to be recomputed on each element of the wavelength axis
individually, this operation is much slower than convolution with a constant kernel,
in which a simple shifting of the array, rather than a recomputation of the rotation
profile is sufficient. By setting the fast keyword, the input array will first
be oversampled onto a constant-velocity grid to enable the usage of a constant kernel,
after which the result is interpolated back to the original grid.
Input:
The wavelength axis wl.
The spectral axis order.
The FHWM width of the resolution element in km/s.
The Radius of the rigid body in Rj.
The periodicity of the rigid body rotation in days.
The inclination of the spin axis in degrees.
Wavelength and order need to be numpy arrays and have the same number of elements.
Rp, P and i need to be scalar floats.
Output:
The blurred spectral axis, with the same dimensions as wl and order.
WARNING: THIS FUNCTION HANDLES NANS POORLY. I HAVE THEREFORE DECIDED CURRENTLY
TO REQUIRE NON-NAN INPUT.
This computes the simple numerical derivative of x by convolving with kernel [-1,0,1].
Parameters
----------
wl : list, np.ndarray
The wavelength array.
order : list, np.ndarray.
The spectral axis.
dv: float
The FWHM of a resolution element in km/s.
Rp: float
The radius of the planet in jupiter radii.
P: float
The rotation period of the planet. For tidally locked planets, this is equal
to the orbital period.
inclination:
The inclination of the spin axis in degrees. Presumed to be close to 90 degrees
for transiting planets
status: bool
Output a statusbar, but only if fast == False.
fast: bool
Re-interpolate the input on a constant-v grid in order to speed up the computation
of the convolution by eliminating the need to re-interpolate the kernel every step.
Returns
-------
order_blurred : np.array
The rotation-broadened spectrum on the same wavelength grid as the input.
Example
-------
>>> import tayph.functions as fun
>>> wl = fun.findgen(4000)*0.001+500.0
>>> fx = wl*0.0
>>> fx[2000] = 1.0
>>> fx_blurred1 = blur_rotate(wl,fx,3.0,1.5,0.8,90.0,status=False,fast=False)
>>> fx_blurred2 = blur_rotate(wl,fx,3.0,1.5,0.8,90.0,status=False,fast=True)
"""
import numpy as np
import tayph.util as ut
import tayph.functions as fun
from tayph.vartests import typetest,nantest,dimtest
from matplotlib import pyplot as plt
import astropy.constants as const
import astropy.units as u
import time
import sys
import pdb
from scipy import interpolate
typetest(dv,float,'dv in blur_rotate()')
typetest(wl,[list,np.ndarray],'wl in blur_rotate()')
typetest(order,[list,np.ndarray],'order in blur_rotate()')
typetest(P,float,'P in blur_rotate()')
typetest(Rp,float,'Rp in blur_rotate()')
typetest(inclination,float,'inclination in blur_rotate()')
typetest(status,bool,'status in blur_rotate()')
typetest(fast,bool,'fast in blur_rotate()')
nantest(wl,'dv in blur_rotate()')
nantest(order,'order in blur_rotate()')
dimtest(wl,[0],'wl in blur_rotate()')
dimtest(order,[len(wl)],'order in blur_rotate()')#Test that wl and order are 1D, and that
#they have the same length.
if np.min(np.array([dv,P,Rp])) <= 0.0:
raise Exception("ERROR in blur_rotate: dv, P and Rp should be strictly positive.")
#ut.typetest_array('wl',wl,np.float64)
#ut.typetest_array('order',order,np.float64)
#This is not possible because order may be 2D...
#And besides, you can have floats, np.float32 and np.float64... All of these would
#need to pass. Need to fix typetest_array some day.
order_blurred=order*0.0#init the output.
truncsize=5.0#The gaussian is truncated at 5 sigma from the extremest points of the RV amplitude.
sig_dv = dv / (2*np.sqrt(2.0*np.log(2))) #Transform FWHM to Gaussian sigma. In km/s.
deriv = derivative(wl)
if max(deriv) < 0:
raise Exception("ERROR in ops.blur_rotate: WL derivative is smaller than 1.0. Sort wl in ascending order.")
sig_wl=wl*sig_dv/(const.c.to('km/s').value)#in nm
sig_px=sig_wl/deriv
n=1000.0
a=np.arange(n, dtype=float)/(n-1)*np.pi #fun.findgen(n)/(n-1)*np.pi
rv=np.cos(a)*np.sin(np.radians(inclination))*(2.0*np.pi*Rp*const.R_jup/(P*u.day)).to('km/s').value #in km/s
trunc_dist=np.round(sig_px*truncsize+np.max(rv)*wl/(const.c.to('km/s').value)/deriv).astype(int)
# print('Maximum rotational rv: %s' % max(rv))
# print('Sigma_px: %s' % np.nanmean(np.array(sig_px)))
rvgrid_max=(np.max(trunc_dist)+1.0)*sig_dv+np.max(rv)
rvgrid_n=rvgrid_max / dv * 100.0 #100 samples per lsf fwhm.
rvgrid=np.arange(-rvgrid_n, rvgrid_n, dtype=float)/rvgrid_n*rvgrid_max #(fun.findgen(2*rvgrid_n+1)-rvgrid_n)/rvgrid_n*rvgrid_max#Need to make sure that this is wider than the truncation bin and more finely sampled than wl - everywhere.
lsf=rvgrid*0.0
#We loop through velocities in the velocity grid to build up the sum of Gaussians
#that is the LSF.
for v in rv:
lsf+=fun.gaussian(rvgrid,1.0,v,sig_dv)#This defines the LSF on a velocity grid wih high fidelity.
if fast:
wlt,fxt,dv = constant_velocity_wl_grid(wl,order,4)
dv_grid = rvgrid[1]-rvgrid[0]
len_rv_grid_low = int(max(rvgrid)/dv*2-2)
# print(len_rv_grid_low)
# print(len(fun.findgen(len_rv_grid_low)))
# print(len_rv_grid_low%2)
if len_rv_grid_low%2 == 0:
len_rv_grid_low -= 1
rvgrid_low = np.arange(len_rv_grid_low)*dv #fun.findgen(len_rv_grid_low)*dv#Slightly smaller than the original grid.
rvgrid_low -=0.5*np.max(rvgrid_low)
lsf_low = interpolate.interp1d(rvgrid,lsf)(rvgrid_low)
lsf_low /=np.sum(lsf_low)#This is now an LSF on a grid with the same spacing as the data has.
#This means I can use it directly as a convolution kernel:
fxt_blurred = convolve(fxt,lsf_low,edge_degree=1,fit_width=1)
#And interpolate back to where it came from:
order_blurred = interpolate.interp1d(wlt,fxt_blurred,bounds_error=False)(wl)
#I can use interp1d because after blurring, we are now oversampled.
# order_blurred2 = bin_avg(wlt,fxt_blurred,wl)
return(order_blurred)
#Now we loop through the wavelength grid to place this LSF at each wavelength position.
for i in range(0,len(wl)):
binstart=max([0,i-trunc_dist[i]])
binend=i+trunc_dist[i]
wlbin=wl[binstart:binend]
wlgrid = wl[i]*rvgrid/(const.c.to('km/s').value)+wl[i]#This converts the velocity grid to a d-wavelength grid centered on wk[i]
#print([np.min(wlbin),np.min(wlgrid),np.max(wlbin),np.max(wlgrid)])
i_wl = interpolate.interp1d(wlgrid,lsf,bounds_error=False,fill_value='extrapolate')#Extrapolate should not be necessary but sometimes there is a minute mismatch between the
#start and end wavelengths of the constructed grid and the bin.
try:
lsf_wl=i_wl(wlbin)
except:
ut.tprint('Error in interpolating LSF onto wlbin. Pausing to debug.')
pdb.set_trace()
k_n=lsf_wl/np.sum(lsf_wl)#Normalize at each instance of the interpolation to make sure flux is conserved exactly.
order_blurred[i]=np.sum(k_n*order[binstart:binend])
if status == True:
ut.statusbar(i,len(wl))
return(order_blurred)
[docs]def airtovac(wlnm):
"""
This converts air wavelengths to vaccuum wavelengths.
Parameters
----------
wlnm : float, np.ndarray
The wavelength that is to be converted.
Returns
-------
wlnm : float, np.array
wavelengths in vaccuum.
Example
-------
>>> import numpy as np
>>> wlA=np.array([500.0,510.0,600.0,700.0])
>>> wlV=airtovac(wlA)
"""
import numpy as np
from tayph.vartests import typetest
typetest(wlnm,[float,np.ndarray],'wlmn in airtovac()')
wlA=wlnm*10.0
s = 1e4 / wlA
n = 1 + (0.00008336624212083 + 0.02408926869968 / (130.1065924522 - s**2) +
0.0001599740894897 / (38.92568793293 - s**2))
return(wlA*n/10.0)
[docs]def vactoair(wlnm):
"""
This converts vaccuum wavelengths to air wavelengths.
Parameters
----------
wlnm : float, np.ndarray
The wavelength that is to be converted.
Returns
-------
wlnm : float, np.array
wavelengths in air.
Example
-------
>>> import numpy as np
>>> wlV=np.array([500.0,510.0,600.0,700.0])
>>> wlA=vactoair(wlV)
"""
import numpy as np
from tayph.vartests import typetest
typetest(wlnm,[float,np.ndarray],'wlmn in vactoair()')
wlA = wlnm*10.0
s = 1e4/wlA
f = 1.0 + 5.792105e-2/(238.0185e0 - s**2) + 1.67917e-3/( 57.362e0 - s**2)
return(wlA/f/10.0)
[docs]def clean_block(wl,block,deg=0,w=200,nsigma=5.0,verbose=False,renorm=True,parallel=False):
"""This quickly cleans a spectral block by performing trimming of zeroes at the edges,
setting zeroes and negative values to NaN, normalising the flux along both axes, rejecting
outlier values by using a running MAD, and optionally detrending using polynomials. This is
intended for quick-look cross-correlation when reading in the data.
The output is the trimmed wavelength axis, the outlier-rejected sequence of spectra (block),
and the residuals after outlier rejection. If detrend set to True, polynomials will be fit, and
the residuals after polyfitting are also returned. So setting detrend=True increases the number
of output variables from 3 to 4.
I've ran mad trying to make this faster but medians simply don't collapse into nice matrix
operations. So what remains is to parallelise running_MAD_2D. The parallel keyword in
clean_block directly propagates into running_MAD_@D If clean_block is called once on a wide
array (e.g. stitched spectra with ~1e5 points, paralellisation achieves a factor 2 speedup on my
old 8-core laptop). However, if clean_block is called on a sequence of less wide arrays (e.g.)
spectral orders, it's better to do that loop in parallel rather than call clean_block with
parallel itself.
Set the renorm keyword to maintain the average flux in the block. If set to False, this
function will return spectra that have an average of 1.0
"""
import numpy as np
import tayph.functions as fun
import warnings
import copy
import tayph.util as ut
block[np.abs(block)<1e-10*np.nanmedian(block)]=0.0#First set very small values to zero.
sum=np.nansum(np.abs(block),axis=0)#Anything that is zero in this sum (i.e. the all-zero
#columns) will be identified.
npx=len(sum)
#THIS REMOVAL OF PADDED PIXELS MAY NEED TO BE REMOVED IF 2D FIXING OF WAVELENGTH SOLUTIONS IS TO
#OCCUR! ALSO IN READ_E2DS!
leftsize=npx-len(np.trim_zeros(sum,'f'))#Size of zero-padding on the left.
rightsize=npx-len(np.trim_zeros(sum,'b'))#Size of zero-padding on the right
wl_trimmed=wl[leftsize:npx-rightsize-1]
block = block[:,leftsize:npx-rightsize-1]
block[block<=0]=np.nan #We get rid of remaining zeroes and negative values:
#Compute residuals of s1d spectra by double-normalisation:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
avg_flux=np.nanmean(block,axis=1)
block = np.transpose(np.transpose(block)/avg_flux)
block_avg=np.nanmean(block,axis=0)
r=block/block_avg
if renorm==False:
avg_flux=avg_flux*0.0+1.0
#Outlier rejection in the 2D residuals via a running MAD:
MAD=fun.running_MAD_2D(r,w,verbose=verbose,parallel=parallel)#This can take long.
warnings.simplefilter("ignore")
r[np.abs(r-np.nanmean(r))/MAD>nsigma]=np.nan
block[np.abs(r-np.nanmean(r))/MAD>nsigma]=np.nan
if deg>0:
r2=copy.deepcopy(r)
for j,s in enumerate(r):
nansel=np.isnan(s)
p=np.polyfit(wl_trimmed[~nansel],s[~nansel],deg)
block[j]/=np.polyval(p,wl_trimmed)
r2[j]/=np.polyval(p,wl_trimmed)
if verbose: ut.statusbar(j,len(r))
return(wl_trimmed,np.transpose(np.transpose(block)*avg_flux),r,r2)#Avg_flux is 1.0 unless
#renorm=True.
return(wl_trimmed,np.transpose(np.transpose(block)*avg_flux),r)