Source code for pyreduce.wavelength_calibration

# -*- coding: utf-8 -*-
"""
Wavelength Calibration
by comparison to a reference spectrum
Loosely bases on the IDL wavecal function
"""

import logging
from os.path import dirname, join

import corner
import emcee
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from numpy.polynomial.polynomial import Polynomial, polyval2d
from scipy import signal
from scipy.constants import speed_of_light
from scipy.interpolate import interp1d
from scipy.ndimage.filters import gaussian_filter1d
from scipy.ndimage.morphology import grey_closing
from scipy.optimize import curve_fit
from tqdm import tqdm

from . import util

logger = logging.getLogger(__name__)


[docs]def polyfit(x, y, deg): res = Polynomial.fit(x, y, deg, domain=[]) coef = res.coef[::-1] return coef
[docs]class AlignmentPlot: """ Makes a plot which can be clicked to align the two spectra, reference and observed """ def __init__(self, ax, obs, lines, offset=(0, 0), plot_title=None): self.im = ax self.first = True self.nord, self.ncol = obs.shape self.RED, self.GREEN, self.BLUE = 0, 1, 2 self.obs = obs self.lines = lines self.plot_title = plot_title self.order_first = 0 self.spec_first = "" self.x_first = 0 self.offset = list(offset) self.make_ref_image()
[docs] def make_ref_image(self): """create and show the reference plot, with the two spectra""" ref_image = np.zeros((self.nord * 2, self.ncol, 3)) for iord in range(self.nord): ref_image[iord * 2, :, self.RED] = 10 * np.ma.filled(self.obs[iord], 0) if 0 <= iord + self.offset[0] < self.nord: for line in self.lines[self.lines["order"] == iord]: first = int(np.clip(line["xfirst"] + self.offset[1], 0, self.ncol)) last = int(np.clip(line["xlast"] + self.offset[1], 0, self.ncol)) order = (iord + self.offset[0]) * 2 + 1 ref_image[order, first:last, self.GREEN] = ( 10 * line["height"] * signal.gaussian(last - first, line["width"]) ) ref_image = np.clip(ref_image, 0, 1) ref_image[ref_image < 0.1] = 0 self.im.imshow( ref_image, aspect="auto", origin="lower", extent=(-0.5, self.ncol - 0.5, -0.5, self.nord - 0.5), ) title = "Alignment, Observed: RED, Reference: GREEN\nGreen should be above red!" if self.plot_title is not None: title = f"{self.plot_title}\n{title}" self.im.figure.suptitle(title) self.im.axes.set_xlabel("x [pixel]") self.im.axes.set_ylabel("Order") self.im.figure.canvas.draw()
[docs] def connect(self): """connect the click event with the appropiate function""" self.cidclick = self.im.figure.canvas.mpl_connect( "button_press_event", self.on_click )
[docs] def on_click(self, event): """On click offset the reference by the distance between click positions""" if event.ydata is None: return order = int(np.floor(event.ydata)) spec = "ref" if (event.ydata - order) > 0.5 else "obs" # if True then reference x = event.xdata print("Order: %i, Spectrum: %s, x: %g" % (order, "ref" if spec else "obs", x)) # on every second click if self.first: self.first = False self.order_first = order self.spec_first = spec self.x_first = x else: # Clicked different spectra if spec != self.spec_first: self.first = True direction = -1 if spec == "ref" else 1 offset_orders = int(order - self.order_first) * direction offset_x = int(x - self.x_first) * direction self.offset[0] -= offset_orders - 1 self.offset[1] -= offset_x self.make_ref_image()
[docs]class LineAtlas: def __init__(self, element, medium="vac"): self.element = element self.medium = medium fname = element.lower() + ".fits" folder = dirname(__file__) self.fname = join(folder, "wavecal/atlas", fname) self.wave, self.flux = self.load_fits(self.fname) try: # If a specific linelist file is provided fname_list = element.lower() + "_list.txt" self.fname_list = join(folder, "wavecal/atlas", fname_list) linelist = np.genfromtxt(self.fname_list, dtype="f8,U8") wpos, element = linelist["f0"], linelist["f1"] indices = self.wave.searchsorted(wpos) heights = self.flux[indices] self.linelist = np.rec.fromarrays( [wpos, heights, element], names=["wave", "heights", "element"] ) except (FileNotFoundError, IOError): # Otherwise fit the line positions from the spectrum logger.warning( "No dedicated linelist found for %s, determining peaks based on the reference spectrum instead.", element, ) module = WavelengthCalibration(plot=False) n, peaks = module._find_peaks(self.flux) wpos = np.interp(peaks, np.arange(len(self.wave)), self.wave) element = np.full(len(wpos), element) indices = self.wave.searchsorted(wpos) heights = self.flux[indices] self.linelist = np.rec.fromarrays( [wpos, heights, element], names=["wave", "heights", "element"] ) # The data files are in vaccuum, if the instrument is in air, we need to convert if medium == "air": self.wave = util.vac2air(self.wave) self.linelist["wave"] = util.vac2air(self.linelist["wave"])
[docs] def load_fits(self, fname): hdu = fits.open(fname) if len(hdu) == 1: # Its just the spectrum # with the wavelength defined via the header keywords header = hdu[0].header spec = hdu[0].data.ravel() wmin = header["CRVAL1"] wdel = header["CDELT1"] wave = np.arange(spec.size) * wdel + wmin else: # Its a binary Table, with two columns for the wavelength and the # spectrum data = hdu[1].data wave = data["wave"] spec = data["spec"] spec /= np.nanmax(spec) spec = np.clip(spec, 0, None) return wave, spec
[docs]class LineList: dtype = np.dtype( ( np.record, [ (("wlc", "WLC"), ">f8"), # Wavelength (before fit) (("wll", "WLL"), ">f8"), # Wavelength (after fit) (("posc", "POSC"), ">f8"), # Pixel Position (before fit) (("posm", "POSM"), ">f8"), # Pixel Position (after fit) (("xfirst", "XFIRST"), ">i2"), # first pixel of the line (("xlast", "XLAST"), ">i2"), # last pixel of the line ( ("approx", "APPROX"), "O", ), # Not used. Describes the shape used to approximate the line. "G" for Gaussian (("width", "WIDTH"), ">f8"), # width of the line in pixels (("height", "HEIGHT"), ">f8"), # relative strength of the line (("order", "ORDER"), ">i2"), # echelle order the line is found in ("flag", "?"), # flag that tells us if we should use the line or not ], ) ) def __init__(self, lines=None): if lines is None: lines = np.array([], dtype=self.dtype) self.data = lines self.dtype = self.data.dtype def __getitem__(self, key): return self.data[key] def __setitem__(self, key, value): self.data[key] = value def __len__(self): return len(self.data)
[docs] @classmethod def load(cls, filename): data = np.load(filename, allow_pickle=True) linelist = cls(data["cs_lines"]) return linelist
[docs] def save(self, filename): np.savez(filename, cs_lines=self.data)
[docs] def append(self, linelist): if isinstance(linelist, LineList): linelist = linelist.data self.data = np.append(self.data, linelist)
[docs] def add_line(self, wave, order, pos, width, height, flag): lines = self.from_list([wave], [order], [pos], [width], [height], [flag]) self.data = np.append(self.data, lines)
[docs] @classmethod def from_list(cls, wave, order, pos, width, height, flag): lines = [ (w, w, p, p, p - wi / 2, p + wi / 2, b"G", wi, h, o, f) for w, o, p, wi, h, f in zip(wave, order, pos, width, height, flag) ] lines = np.array(lines, dtype=cls.dtype) return cls(lines)
[docs]class WavelengthCalibration: """ Wavelength Calibration Module Takes an observed wavelength image and the reference linelist and returns the wavelength at each pixel """ def __init__( self, threshold=100, degree=(6, 6), iterations=3, dimensionality="2D", nstep=0, shift_window=0.01, manual=False, polarim=False, lfc_peak_width=3, closing=5, element=None, medium="vac", plot=True, plot_title=None, ): #:float: Residual threshold in m/s above which to remove lines self.threshold = threshold #:tuple(int, int): polynomial degree of the wavelength fit in (pixel, order) direction self.degree = degree if dimensionality == "1D": self.degree = int(degree) elif dimensionality == "2D": self.degree = (int(degree[0]), int(degree[1])) #:int: Number of iterations in the remove residuals, auto id, loop self.iterations = iterations #:{"1D", "2D"}: Whether to use 1d or 2d fit self.dimensionality = dimensionality #:bool: Whether to fit for pixel steps (offsets) in the detector self.nstep = nstep #:float: Fraction if the number of columns to use in the alignment of individual orders. Set to 0 to disable self.shift_window = shift_window #:bool: Whether to manually align the reference instead of using cross correlation self.manual = manual #:bool: Whether to use polarimetric orders instead of the usual ones. I.e. Each pair of two orders represents the same data. Not Supported yet self.polarim = polarim #:int: Whether to plot the results. Set to 2 to plot during all steps. self.plot = plot self.plot_title = plot_title #:str: Elements used in the wavelength calibration. Used in AutoId to find more lines from the Atlas self.element = element #:str: Medium of the detector, vac or air self.medium = medium #:int: Laser Frequency Peak width (for scipy.signal.find_peaks) self.lfc_peak_width = lfc_peak_width #:int: grey closing range for the input image self.closing = 5 #:int: Number of orders in the observation self.nord = None #:int: Number of columns in the observation self.ncol = None @property def step_mode(self): return self.nstep > 0 @property def dimensionality(self): """{"1D", "2D"}: Whether to use 1D or 2D polynomials for the wavelength solution""" return self._dimensionality @dimensionality.setter def dimensionality(self, value): accepted_values = ["1D", "2D"] if value in accepted_values: self._dimensionality = value else: raise ValueError( f"Value for 'dimensionality' not understood. Expected one of {accepted_values} but got {value} instead" )
[docs] def normalize(self, obs, lines): """ Normalize the observation and reference list in each order individually Copies the data if the image, but not of the linelist Parameters ---------- obs : array of shape (nord, ncol) observed image lines : recarray of shape (nlines,) reference linelist Returns ------- obs : array of shape (nord, ncol) normalized image lines : recarray of shape (nlines,) normalized reference linelist """ # normalize order by order obs = np.ma.copy(obs) for i in range(len(obs)): if self.closing > 0: obs[i] = grey_closing(obs[i], self.closing) try: obs[i] -= np.ma.median(obs[i][obs[i] > 0]) except ValueError: logger.warning( f"Could not determine the minimum value in order %i. No positive values found", i, ) obs[i] /= np.ma.max(obs[i]) # Remove negative outliers std = np.std(obs, axis=1)[:, None] obs[obs <= -2 * std] = np.ma.masked # obs[obs <= 0] = np.ma.masked # Normalize lines in each order for order in np.unique(lines["order"]): select = lines["order"] == order topheight = np.max(lines[select]["height"]) lines["height"][select] /= topheight return obs, lines
[docs] def create_image_from_lines(self, lines): """ Create a reference image based on a line list Each line will be approximated by a Gaussian Space inbetween lines is 0 The number of orders is from 0 to the maximum order Parameters ---------- lines : recarray of shape (nlines,) line data Returns ------- img : array of shape (nord, ncol) New reference image """ min_order = int(np.min(lines["order"])) max_order = int(np.max(lines["order"])) img = np.zeros((max_order - min_order + 1, self.ncol)) for line in lines: if line["order"] < 0: continue if line["xlast"] < 0 or line["xfirst"] > self.ncol: continue first = int(max(line["xfirst"], 0)) last = int(min(line["xlast"], self.ncol)) img[int(line["order"]) - min_order, first:last] = line[ "height" ] * signal.gaussian(last - first, line["width"]) return img
[docs] def align_manual(self, obs, lines): """ Open an AlignmentPlot window for manual selection of the alignment Parameters ---------- obs : array of shape (nord, ncol) observed image lines : recarray of shape (nlines,) reference linelist Returns ------- offset : tuple(int, int) offset in order and column to be applied to each line in the linelist """ _, ax = plt.subplots() ap = AlignmentPlot(ax, obs, lines, plot_title=self.plot_title) ap.connect() plt.show() offset = ap.offset return offset
[docs] def apply_alignment_offset(self, lines, offset, select=None): """ Apply an offset to the linelist Parameters ---------- lines : recarray of shape (nlines,) reference linelist offset : tuple(int, int) offset in (order, column) select : array of shape(nlines,), optional Mask that defines which lines the offset applies to Returns ------- lines : recarray of shape (nlines,) linelist with offset applied """ if select is None: select = slice(None) lines["xfirst"][select] += offset[1] lines["xlast"][select] += offset[1] lines["posm"][select] += offset[1] lines["order"][select] += offset[0] return lines
[docs] def align(self, obs, lines): """ Align the observation with the reference spectrum Either automatically using cross correlation or manually (visually) Parameters ---------- obs : array[nrow, ncol] observed wavelength calibration spectrum (e.g. obs=ThoriumArgon) lines : struct_array reference line data manual : bool, optional wether to manually align the spectra (default: False) plot : bool, optional wether to plot the alignment (default: False) Returns ------- offset: tuple(int, int) offset in order and column """ obs = np.ma.filled(obs, 0) if not self.manual: # make image from lines img = self.create_image_from_lines(lines) # Cross correlate with obs image # And determine overall offset correlation = signal.correlate2d(obs, img, mode="same") offset_order, offset_x = np.unravel_index( np.argmax(correlation), correlation.shape ) if self.plot >= 2: plt.imshow(correlation, aspect="auto") plt.vlines(offset_x, -0.5, self.nord - 0.5, color="red") plt.hlines(offset_order, -0.5, self.ncol - 0.5, color="red") if self.plot_title is not None: plt.title(self.plot_title) plt.show() offset_order = offset_order - img.shape[0] / 2 + 1 offset_x = offset_x - img.shape[1] / 2 + 1 offset = [int(offset_order), int(offset_x)] # apply offset lines = self.apply_alignment_offset(lines, offset) if self.shift_window != 0: # Shift individual orders to fit reference # Only allow a small shift here (1%) ? img = self.create_image_from_lines(lines) for i in range(max(offset[0], 0), min(len(obs), len(img))): correlation = signal.correlate(obs[i], img[i], mode="same") width = int(self.ncol * self.shift_window) // 2 low, high = self.ncol // 2 - width, self.ncol // 2 + width offset_x = np.argmax(correlation[low:high]) + low offset_x = int(offset_x - self.ncol / 2 + 1) select = lines["order"] == i lines = self.apply_alignment_offset(lines, (0, offset_x), select) if self.plot or self.manual: offset = self.align_manual(obs, lines) lines = self.apply_alignment_offset(lines, offset) logger.debug(f"Offset order: {offset[0]}, Offset pixel: {offset[1]}") return lines
def _fit_single_line(self, obs, center, width, plot=False): low = int(center - width * 5) low = max(low, 0) high = int(center + width * 5) high = min(high, len(obs)) section = obs[low:high] x = np.arange(low, high, 1) x = np.ma.masked_array(x, mask=np.ma.getmaskarray(section)) coef = util.gaussfit2(x, section) if self.plot >= 2 and plot: x2 = np.linspace(x.min(), x.max(), len(x) * 100) plt.plot(x, section, label="Observation") plt.plot(x2, util.gaussval2(x2, *coef), label="Fit") title = "Gaussian Fit to spectral line" if self.plot_title is not None: title = f"{self.plot_title}\n{title}" plt.title(title) plt.xlabel("x [pixel]") plt.ylabel("Intensity [a.u.]") plt.legend() plt.show() return coef
[docs] def fit_lines(self, obs, lines): """ Determine exact position of each line on the detector based on initial guess This fits a Gaussian to each line, and uses the peak position as a new solution Parameters ---------- obs : array of shape (nord, ncol) observed wavelength calibration image lines : recarray of shape (nlines,) reference line data Returns ------- lines : recarray of shape (nlines,) Updated line information (posm is changed) """ # For each line fit a gaussian to the observation for i, line in tqdm( enumerate(lines), total=len(lines), leave=False, desc="Lines" ): if line["posm"] < 0 or line["posm"] >= obs.shape[1]: # Line outside pixel range continue if line["order"] < 0 or line["order"] >= len(obs): # Line outside order range continue try: coef = self._fit_single_line( obs[int(line["order"])], line["posm"], line["width"], plot=line["flag"], ) lines[i]["posm"] = coef[1] except: # Gaussian fit failed, dont use line lines[i]["flag"] = False return lines
[docs] def build_2d_solution(self, lines, plot=False): """ Create a 2D polynomial fit to flagged lines degree : tuple(int, int), optional polynomial degree of the fit in (column, order) dimension (default: (6, 6)) Parameters ---------- lines : struc_array line data plot : bool, optional wether to plot the solution (default: False) Returns ------- coef : array[degree_x, degree_y] 2d polynomial coefficients """ if self.step_mode: return self.build_step_solution(lines, plot=plot) # Only use flagged data mask = lines["flag"] # True: use line, False: dont use line m_wave = lines["wll"][mask] m_pix = lines["posm"][mask] m_ord = lines["order"][mask] if self.dimensionality == "1D": nord = int(m_ord.max() + 1) coef = np.zeros((nord, self.degree + 1)) for i in range(nord): select = m_ord == i deg = max(min(self.degree, np.count_nonzero(select) - 2), 0) coef[i, -(deg + 1) :] = np.polyfit( m_pix[select], m_wave[select], deg=deg ) elif self.dimensionality == "2D": # 2d polynomial fit with: x = column, y = order, z = wavelength coef = util.polyfit2d(m_pix, m_ord, m_wave, degree=self.degree, plot=False) else: raise ValueError( f"Parameter 'mode' not understood. Expected '1D' or '2D' but got {self.dimensionality}" ) if plot or self.plot >= 2: # pragma: no cover self.plot_residuals(lines, coef, title="Residuals") return coef
[docs] def g(self, x, step_coef_pos, step_coef_diff): try: bins = step_coef_pos digits = np.digitize(x, bins) - 1 except ValueError as e: return np.inf cumsum = np.cumsum(step_coef_diff) x = x + cumsum[digits] return x
[docs] def f(self, x, poly_coef, step_coef_pos, step_coef_diff): xdash = self.g(x, step_coef_pos, step_coef_diff) if np.all(np.isinf(xdash)): return np.inf y = np.polyval(poly_coef, xdash) return y
[docs] def build_step_solution(self, lines, plot=False): """ Fit the least squares fit to the wavelength points, with additional free parameters for detector gaps, e.g. due to stitching. The exact method of the fit depends on the dimensionality. Either way we are using the usual polynomial fit for the wavelength, but the x points are modified beforehand by shifting them some amount, at specific indices. We assume that the stitching effects are distributed evenly and we know how many steps we expect (this is set as "nstep"). Parameters ---------- lines : np.recarray linedata plot : bool, optional whether to plot results or not, by default False Returns ------- coef coefficients of the best fit """ mask = lines["flag"] # True: use line, False: dont use line m_wave = lines["wll"][mask] m_pix = lines["posm"][mask] m_ord = lines["order"][mask] nstep = self.nstep ncol = self.ncol if self.dimensionality == "1D": coef = {} for order in np.unique(m_ord): select = m_ord == order x = xl = m_pix[select] y = m_wave[select] step_coef = np.zeros((nstep, 2)) step_coef[:, 0] = np.linspace(ncol / (nstep + 1), ncol, nstep + 1)[:-1] def func(x, *param): return self.f(x, poly_coef, step_coef[:, 0], param) for i in range(5): poly_coef = np.polyfit(xl, y, self.degree) res, _ = curve_fit(func, x, y, p0=step_coef[:, 1], bounds=[-1, 1]) step_coef[:, 1] = res xl = self.g(x, step_coef[:, 0], step_coef[:, 1]) coef[order] = [poly_coef, step_coef] elif self.dimensionality == "2D": unique = np.unique(m_ord) nord = len(unique) shape = (self.degree[0] + 1, self.degree[1] + 1) n = np.prod(shape) step_coef = np.zeros((nord, nstep, 2)) step_coef[:, :, 0] = np.linspace(ncol / (nstep + 1), ncol, nstep + 1)[:-1] def func(x, *param): x, y = x[: len(x) // 2], x[len(x) // 2 :] theta = np.asarray(param).reshape((nord, nstep)) xl = np.copy(x) for j, i in enumerate(unique): xl[y == i] = self.g(x[y == i], step_coef[j, :, 0], theta[j]) z = polyval2d(xl, y, poly_coef) return z # TODO: this could use some optimization x = np.copy(m_pix) x0 = np.concatenate((m_pix, m_ord)) resid_old = np.inf for k in tqdm(range(5)): poly_coef = util.polyfit2d( x, m_ord, m_wave, degree=self.degree, plot=False ) res, _ = curve_fit(func, x0, m_wave, p0=step_coef[:, :, 1]) step_coef[:, :, 1] = res.reshape((nord, nstep)) for j, i in enumerate(unique): x[m_ord == i] = self.g( m_pix[m_ord == i], step_coef[j][:, 0], step_coef[j][:, 1] ) resid = polyval2d(x, m_ord, poly_coef) - m_wave resid = np.sum(resid ** 2) improvement = resid_old - resid resid_old = resid logger.info( "Iteration: %i, Residuals: %.5g, Improvement: %.5g", k, resid, improvement, ) poly_coef = util.polyfit2d(x, m_ord, m_wave, degree=self.degree, plot=False) step_coef = {i: step_coef[j] for j, i in enumerate(unique)} coef = (poly_coef, step_coef) else: raise ValueError( f"Parameter 'dimensionality' not understood. Expected '1D' or '2D' but got {self.dimensionality}" ) return coef
[docs] def evaluate_step_solution(self, pos, order, solution): if not np.array_equal(np.shape(pos), np.shape(order)): raise ValueError("pos and order must have the same shape") if self.dimensionality == "1D": result = np.zeros(pos.shape) for i in np.unique(order): select = order == i result[select] = self.f( pos[select], solution[i][0], solution[i][1][:, 0], solution[i][1][:, 1], ) elif self.dimensionality == "2D": poly_coef, step_coef = solution pos = np.copy(pos) for i in np.unique(order): pos[order == i] = self.g( pos[order == i], step_coef[i][:, 0], step_coef[i][:, 1] ) result = polyval2d(pos, order, poly_coef) else: raise ValueError( f"Parameter 'mode' not understood, expected '1D' or '2D' but got {self.dimensionality}" ) return result
[docs] def evaluate_solution(self, pos, order, solution): """ Evaluate the 1d or 2d wavelength solution at the given pixel positions and orders Parameters ---------- pos : array pixel position on the detector (i.e. x axis) order : array order of each point solution : array of shape (nord, ndegree) or (degree_x, degree_y) polynomial coefficients. For mode=1D, one set of coefficients per order. For mode=2D, the first dimension is for the positions and the second for the orders mode : str, optional Wether to interpret the solution as 1D or 2D polynomials, by default "1D" Returns ------- result: array Evaluated polynomial Raises ------ ValueError If pos and order have different shapes, or mode is of the wrong value """ if not np.array_equal(np.shape(pos), np.shape(order)): raise ValueError("pos and order must have the same shape") if self.step_mode: return self.evaluate_step_solution(pos, order, solution) if self.dimensionality == "1D": result = np.zeros(pos.shape) for i in np.unique(order): select = order == i result[select] = np.polyval(solution[int(i)], pos[select]) elif self.dimensionality == "2D": result = np.polynomial.polynomial.polyval2d(pos, order, solution) else: raise ValueError( f"Parameter 'mode' not understood, expected '1D' or '2D' but got {self.dimensionality}" ) return result
[docs] def make_wave(self, wave_solution, plot=False): """Expand polynomial wavelength solution into full image Parameters ---------- wave_solution : array of shape(degree,) polynomial coefficients of wavelength solution plot : bool, optional wether to plot the solution (default: False) Returns ------- wave_img : array of shape (nord, ncol) wavelength solution for each point in the spectrum """ y, x = np.indices((self.nord, self.ncol)) wave_img = self.evaluate_solution(x, y, wave_solution) return wave_img
[docs] def auto_id(self, obs, wave_img, lines): """Automatically identify peaks that are close to known lines Parameters ---------- obs : array of shape (nord, ncol) observed spectrum wave_img : array of shape (nord, ncol) wavelength solution image lines : struc_array line data threshold : int, optional difference threshold between line positions in m/s, until which a line is considered identified (default: 1) plot : bool, optional wether to plot the new lines Returns ------- lines : struct_array line data with new flags """ new_lines = [] if self.atlas is not None: # For each order, find the corresponding section in the Atlas # Look for strong lines in the atlas and the spectrum that match in position # Add new lines to the linelist width_of_atlas_peaks = 3 for order in range(obs.shape[0]): mask = ~np.ma.getmask(obs[order]) index_mask = np.arange(len(mask))[mask] data_obs = obs[order, mask] wave_obs = wave_img[order, mask] threshold_of_peak_closeness = ( np.diff(wave_obs) / wave_obs[:-1] * speed_of_light ) threshold_of_peak_closeness = np.max(threshold_of_peak_closeness) wmin, wmax = wave_obs[0], wave_obs[-1] imin, imax = np.searchsorted(self.atlas.wave, (wmin, wmax)) wave_atlas = self.atlas.wave[imin:imax] data_atlas = self.atlas.flux[imin:imax] if len(data_atlas) == 0: continue data_atlas = data_atlas / data_atlas.max() line = lines[ (lines["order"] == order) & (lines["wll"] > wmin) & (lines["wll"] < wmax) ] peaks_atlas, peak_info_atlas = signal.find_peaks( data_atlas, height=0.01, width=width_of_atlas_peaks ) peaks_obs, peak_info_obs = signal.find_peaks( data_obs, height=0.01, width=0 ) for i, p in enumerate(peaks_atlas): # Look for an existing line in the vicinityq wpeak = wave_atlas[p] diff = np.abs(line["wll"] - wpeak) / wpeak * speed_of_light if np.any(diff < threshold_of_peak_closeness): # Line already in the linelist, ignore continue else: # Look for matching peak in observation diff = ( np.abs(wpeak - wave_obs[peaks_obs]) / wpeak * speed_of_light ) imin = np.argmin(diff) if diff[imin] < threshold_of_peak_closeness: # Add line to linelist # Location on the detector # Include the masked areas!!! ipeak = peaks_obs[imin] ipeak = index_mask[ipeak] # relative height of the peak hpeak = data_obs[peaks_obs[imin]] wipeak = peak_info_obs["widths"][imin] # wave, order, pos, width, height, flag new_lines.append([wpeak, order, ipeak, wipeak, hpeak, True]) # Add new lines to the linelist if len(new_lines) != 0: new_lines = np.array(new_lines).T new_lines = LineList.from_list(*new_lines) new_lines = self.fit_lines(obs, new_lines) lines.append(new_lines) # Option 1: # Step 1: Loop over unused lines in lines # Step 2: find peaks in neighbourhood # Step 3: Toggle flag on if close counter = 0 for i, line in enumerate(lines): if line["flag"]: # Line is already in use continue if line["order"] < 0 or line["order"] >= self.nord: # Line outside order range continue iord = int(line["order"]) if line["wll"] < wave_img[iord][0] or line["wll"] >= wave_img[iord][-1]: # Line outside pixel range continue wl = line["wll"] width = line["width"] * 5 wave = wave_img[iord] order_obs = obs[iord] # Find where the line should be try: idx = np.digitize(wl, wave) except ValueError: # Wavelength solution is not monotonic idx = np.where(wave >= wl)[0][0] low = int(idx - width) low = max(low, 0) high = int(idx + width) high = min(high, len(order_obs)) vec = order_obs[low:high] if np.all(np.ma.getmaskarray(vec)): continue # Find the best fitting peak # TODO use gaussian fit? peak_idx, _ = signal.find_peaks(vec, height=np.ma.median(vec), width=3) if len(peak_idx) > 0: peak_pos = np.copy(peak_idx).astype(float) for j in range(len(peak_idx)): try: coef = self._fit_single_line(vec, peak_idx[j], line["width"]) peak_pos[j] = coef[1] except: peak_pos[j] = np.nan pass pos_wave = np.interp(peak_pos, np.arange(high - low), wave[low:high]) residual = np.abs(wl - pos_wave) / wl * speed_of_light idx = np.argmin(residual) if residual[idx] < self.threshold: counter += 1 lines["flag"][i] = True lines["posm"][i] = low + peak_pos[idx] logger.info("AutoID identified %i new lines", counter + len(new_lines)) return lines
[docs] def calculate_residual(self, wave_solution, lines): """ Calculate all residuals of all given lines Residual = (Wavelength Solution - Expected Wavelength) / Expected Wavelength * speed of light Parameters ---------- wave_solution : array of shape (degree_x, degree_y) polynomial coefficients of the wavelength solution (in numpy format) lines : recarray of shape (nlines,) contains the position of the line on the detector (posm), the order (order), and the expected wavelength (wll) Returns ------- residual : array of shape (nlines,) Residual of each line in m/s """ x = lines["posm"] y = lines["order"] mask = ~lines["flag"] solution = self.evaluate_solution(x, y, wave_solution) residual = (solution - lines["wll"]) / lines["wll"] * speed_of_light residual = np.ma.masked_array(residual, mask=mask) return residual
[docs] def reject_outlier(self, residual, lines): """ Reject the strongest outlier Parameters ---------- residual : array of shape (nlines,) residuals of all lines lines : recarray of shape (nlines,) line data Returns ------- lines : struct_array line data with one more flagged line residual : array of shape (nlines,) residuals of each line, with outliers masked (including the new one) """ # Strongest outlier ibad = np.ma.argmax(np.abs(residual)) lines["flag"][ibad] = False return lines
[docs] def reject_lines(self, lines, plot=False): """ Reject the largest outlier one by one until all residuals are lower than the threshold Parameters ---------- lines : recarray of shape (nlines,) Line data with pixel position, and expected wavelength threshold : float, optional upper limit for the residual, by default 100 degree : tuple, optional polynomial degree of the wavelength solution (pixel, column) (default: (6, 6)) plot : bool, optional Wether to plot the results (default: False) Returns ------- lines : recarray of shape (nlines,) Line data with updated flags """ wave_solution = self.build_2d_solution(lines) residual = self.calculate_residual(wave_solution, lines) nbad = 0 while np.ma.any(np.abs(residual) > self.threshold): lines = self.reject_outlier(residual, lines) wave_solution = self.build_2d_solution(lines) residual = self.calculate_residual(wave_solution, lines) nbad += 1 logger.info("Discarding %i lines", nbad) if plot or self.plot >= 2: # pragma: no cover mask = lines["flag"] _, axis = plt.subplots() axis.plot(lines["order"][mask], residual[mask], "X", label="Accepted Lines") axis.plot( lines["order"][~mask], residual[~mask], "D", label="Rejected Lines" ) axis.set_xlabel("Order") axis.set_ylabel("Residual [m/s]") axis.set_title("Residuals versus order") axis.legend() fig, ax = plt.subplots( nrows=self.nord // 2, ncols=2, sharex=True, squeeze=False ) plt.subplots_adjust(hspace=0) fig.suptitle("Residuals of each order versus image columns") for iord in range(self.nord): order_lines = lines[lines["order"] == iord] solution = self.evaluate_solution( order_lines["posm"], order_lines["order"], wave_solution ) # Residual in m/s residual = ( (solution - order_lines["wll"]) / order_lines["wll"] * speed_of_light ) mask = order_lines["flag"] ax[iord // 2, iord % 2].plot( order_lines["posm"][mask], residual[mask], "X", label="Accepted Lines", ) ax[iord // 2, iord % 2].plot( order_lines["posm"][~mask], residual[~mask], "D", label="Rejected Lines", ) # ax[iord // 2, iord % 2].tick_params(labelleft=False) ax[iord // 2, iord % 2].set_ylim( -self.threshold * 1.5, +self.threshold * 1.5 ) ax[-1, 0].set_xlabel("x [pixel]") ax[-1, 1].set_xlabel("x [pixel]") ax[0, 0].legend() plt.show() return lines
[docs] def plot_results(self, wave_img, obs): plt.subplot(211) title = "Wavelength solution with Wavelength calibration spectrum\nOrders are in different colours" if self.plot_title is not None: title = f"{self.plot_title}\n{title}" plt.title(title) plt.xlabel("Wavelength") plt.ylabel("Observed spectrum") for i in range(self.nord): plt.plot(wave_img[i], obs[i], label="Order %i" % i) plt.subplot(212) plt.title("2D Wavelength solution") plt.imshow( wave_img, aspect="auto", origin="lower", extent=(0, self.ncol, 0, self.nord) ) cbar = plt.colorbar() plt.xlabel("Column") plt.ylabel("Order") cbar.set_label("Wavelength [Å]") plt.show()
[docs] def plot_residuals(self, lines, coef, title="Residuals"): orders = np.unique(lines["order"]) norders = len(orders) if self.plot_title is not None: title = f"{self.plot_title}\n{title}" plt.suptitle(title) nplots = int(np.ceil(norders / 2)) for i, order in enumerate(orders): plt.subplot(nplots, 2, i + 1) order_lines = lines[lines["order"] == order] if len(order_lines) > 0: residual = self.calculate_residual(coef, order_lines) plt.plot(order_lines["posm"], residual, "rX") plt.hlines([0], 0, self.ncol) plt.xlim(0, self.ncol) plt.ylim(-self.threshold, self.threshold) if (i + 1) not in [norders, norders - 1]: plt.xticks([]) else: plt.xlabel("x [Pixel]") if (i + 1) % 2 == 0: plt.yticks([]) # else: # plt.yticks([-self.threshold, 0, self.threshold]) plt.subplots_adjust(hspace=0, wspace=0.1) # order = 0 # order_lines = lines[lines["order"] == order] # if len(order_lines) > 0: # residual = self.calculate_residual(coef, order_lines) # plt.plot(order_lines["posm"], residual, "rX") # plt.hlines([0], 0, self.ncol) # plt.xlim(0, self.ncol) # plt.ylim(-self.threshold, self.threshold) # plt.xlabel("x [Pixel]") # plt.ylabel("Residual [m/s]") plt.show()
def _find_peaks(self, comb): # Find peaks in the comb spectrum # Run find_peak twice # once to find the average distance between peaks # once for real (disregarding close peaks) c = comb - np.ma.min(comb) width = self.lfc_peak_width height = np.ma.median(c) peaks, _ = signal.find_peaks(c, height=height, width=width) distance = np.median(np.diff(peaks)) // 4 peaks, _ = signal.find_peaks(c, height=height, distance=distance, width=width) # Fit peaks with gaussian to get accurate position new_peaks = peaks.astype(float) width = np.mean(np.diff(peaks)) // 2 for j, p in enumerate(peaks): idx = p + np.arange(-width, width + 1, 1) idx = np.clip(idx, 0, len(c) - 1).astype(int) try: coef = util.gaussfit3(np.arange(len(idx)), c[idx]) new_peaks[j] = coef[1] + p - width except RuntimeError: new_peaks[j] = p n = np.arange(len(peaks)) # keep peaks within the range mask = (new_peaks > 0) & (new_peaks < len(c)) n, new_peaks = n[mask], new_peaks[mask] return n, new_peaks
[docs] def calculate_AIC(self, lines, wave_solution): if self.step_mode: if self.dimensionality == "1D": k = 1 for _, v in wave_solution.items(): k += np.size(v[0]) k += np.size(v[1]) elif self.dimensionality == "2D": k = 1 poly_coef, steps_coef = wave_solution for _, v in steps_coef.items(): k += np.size(v) k += np.size(poly_coef) else: k = np.size(wave_solution) + 1 # We get the residuals in velocity space # but need to remove the speed of light component, to get dimensionless parameters x = lines["posm"] y = lines["order"] mask = ~lines["flag"] solution = self.evaluate_solution(x, y, wave_solution) rss = (solution - lines["wll"]) / lines["wll"] # rss = self.calculate_residual(wave_solution, lines) # rss /= speed_of_light n = rss.size rss = np.ma.sum(rss ** 2) # As per Wikipedia https://en.wikipedia.org/wiki/Akaike_information_criterion logl = np.log(rss) aic = 2 * k + n * logl self.logl = logl self.aicc = aic + (2 * k ** 2 + 2 * k) / (n - k - 1) self.aic = aic return aic
[docs] def execute(self, obs, lines): """ Perform the whole wavelength calibration procedure with the current settings Parameters ---------- obs : array of shape (nord, ncol) observed image lines : recarray of shape (nlines,) reference linelist Returns ------- wave_img : array of shape (nord, ncol) Wavelength solution for each pixel Raises ------ NotImplementedError If polarimitry flag is set """ if self.polarim: raise NotImplementedError("polarized orders not implemented yet") self.nord, self.ncol = obs.shape lines = LineList(lines) if self.element is not None: try: self.atlas = LineAtlas(self.element, self.medium) except FileNotFoundError: logger.warning("No Atlas file found for element %s", self.element) self.atlas = None except: self.atlas = None else: self.atlas = None obs, lines = self.normalize(obs, lines) # Step 1: align obs and reference lines = self.align(obs, lines) # Keep original positions for reference lines["posc"] = np.copy(lines["posm"]) # Step 2: Locate the lines on the detector, and update the pixel position # lines["flag"] = True lines = self.fit_lines(obs, lines) for i in range(self.iterations): logger.info(f"Wavelength calibration iteration: {i}") # Step 3: Create a wavelength solution on known lines wave_solution = self.build_2d_solution(lines) wave_img = self.make_wave(wave_solution) # Step 4: Identify lines that fit into the solution lines = self.auto_id(obs, wave_img, lines) # Step 5: Reject outliers lines = self.reject_lines(lines) # lines = self.reject_lines(lines) logger.info( "Number of lines used for wavelength calibration: %i", np.count_nonzero(lines["flag"]), ) # Step 6: build final 2d solution wave_solution = self.build_2d_solution(lines, plot=self.plot) wave_img = self.make_wave(wave_solution) if self.plot: self.plot_results(wave_img, obs) aic = self.calculate_AIC(lines, wave_solution) logger.info("AIC of wavelength fit: %f", aic) # np.savez("cs_lines.npz", cs_lines=lines.data) return wave_img, wave_solution
[docs]class WavelengthCalibrationComb(WavelengthCalibration):
[docs] def execute(self, comb, wave, lines=None): self.nord, self.ncol = comb.shape # TODO give everything better names pixel, order, wavelengths = [], [], [] n_all, f_all = [], [] comb = np.ma.masked_array(comb, mask=comb <= 0) for i in range(self.nord): # Find Peak positions in current order n, peaks = self._find_peaks(comb[i]) # Determine the n-offset of this order, relative to the anchor frequency # Use the existing absolute wavelength calibration as reference y_ord = np.full(len(peaks), i) w_old = interp1d(np.arange(len(wave[i])), wave[i], kind="cubic")(peaks) f_old = speed_of_light / w_old # fr: repeating frequency # fd: anchor frequency of this order, needs to be shifted to the absolute reference frame fr = np.median(np.diff(f_old)) fd = np.median(f_old % fr) n_raw = (f_old - fd) / fr n = np.round(n_raw) if np.any(np.abs(n_raw - n) > 0.3): logger.warning( "Bad peaks detected in the frequency comb in order %i", i ) fr, fd = polyfit(n, f_old, deg=1) n_offset = 0 # The first order is used as the baseline for all other orders # The choice is arbitrary and doesn't matter if i == 0: f0 = fd n_offset = 0 else: # n0: shift in n, relative to the absolute reference # shift n to the absolute grid, so that all peaks are given by the same f0 n_offset = (f0 - fd) / fr n_offset = int(round(n_offset)) n -= n_offset fd += n_offset * fr n = np.abs(n) n_all += [n] f_all += [f_old] pixel += [peaks] order += [y_ord] logger.debug( "LFC Order: %i, f0: %.3f, fr: %.5f, n0: %.2f", i, fd, fr, n_offset ) # Here we postualte that m * lambda = const # where m is the peak number # this is the result of the grating equation # at least const is roughly constant for neighbouring peaks correct = True if correct: w_all = [speed_of_light / f for f in f_all] mw_all = [m * w for m, w in zip(n_all, w_all)] y = np.concatenate(mw_all) gap = np.median(y) corr = np.zeros(self.nord) for i in range(self.nord): corri = gap / w_all[i] - n_all[i] corri = np.median(corri) corr[i] = np.round(corri) n_all[i] += corr[i] logger.debug("LFC order offset correction: %s", corr) for i in range(self.nord): coef = polyfit(n_all[i], n_all[i] * w_all[i], deg=5) mw = np.polyval(coef, n_all[i]) w_all[i] = mw / n_all[i] f_all[i] = speed_of_light / w_all[i] # Merge Data n_all = np.concatenate(n_all) f_all = np.concatenate(f_all) pixel = np.concatenate(pixel) order = np.concatenate(order) # Fit f0 and fr to all data # (fr, f0), cov = np.polyfit(n_all, f_all, deg=1, cov=True) fr, f0 = polyfit(n_all, f_all, deg=1) logger.debug("Laser Frequency Comb Anchor Frequency: %.3f 10**10 Hz", f0) logger.debug("Laser Frequency Comb Repeating Frequency: %.5f 10**10 Hz", fr) # All peaks are then given by f0 + n * fr wavelengths = speed_of_light / (f0 + n_all * fr) flag = np.full(len(wavelengths), True) laser_lines = np.rec.fromarrays( (wavelengths, pixel, pixel, order, flag), names=("wll", "posm", "posc", "order", "flag"), ) # Use now better resolution to find the new solution # A single pass of discarding outliers should be enough coef = self.build_2d_solution(laser_lines) # resid = self.calculate_residual(coef, laser_lines) # laser_lines["flag"] = np.abs(resid) < self.threshold # coef = self.build_2d_solution(laser_lines) new_wave = self.make_wave(coef) aic = self.calculate_AIC(laser_lines, coef) self.n_lines_good = np.count_nonzero(laser_lines["flag"]) logger.info( f"Laser Frequency Comb solution based on {self.n_lines_good} lines." ) if self.plot: residual = wave - new_wave residual = residual.ravel() area = np.percentile(residual, (32, 50, 68)) area = area[0] - 5 * (area[1] - area[0]), area[0] + 5 * (area[2] - area[1]) plt.hist(residual, bins=100, range=area) title = "ThAr - LFC" if self.plot_title is not None: title = f"{self.plot_title}\n{title}" plt.title(title) plt.xlabel(r"$\Delta\lambda$ [Å]") plt.ylabel("N") plt.show() if self.plot: if lines is not None: self.plot_residuals( lines, coef, title="GasLamp Line Residuals in the Laser Frequency Comb Solution", ) self.plot_residuals( laser_lines, coef, title="Laser Frequency Comb Peak Residuals in the LFC Solution", ) if self.plot: wave_img = wave title = "Difference between GasLamp Solution and Laser Frequency Comb solution\nEach plot shows one order" if self.plot_title is not None: title = f"{self.plot_title}\n{title}" plt.suptitle(title) for i in range(len(new_wave)): plt.subplot(len(new_wave) // 4 + 1, 4, i + 1) plt.plot(wave_img[i] - new_wave[i]) plt.show() if self.plot: self.plot_results(new_wave, comb) return new_wave
[docs]class WavelengthCalibrationInitialize(WavelengthCalibration): def __init__( self, degree=2, plot=False, plot_title="Wavecal Initial", wave_delta=20, nwalkers=100, steps=50_000, resid_delta=1000, cutoff=5, smoothing=0, element="thar", medium="vac", ): super().__init__( degree=degree, element=element, medium=medium, plot=plot, plot_title=plot_title, dimensionality="1D", ) #:float: wavelength uncertainty on the initial guess in Angstrom self.wave_delta = wave_delta #:int: number of walkers in the MCMC self.nwalkers = nwalkers #:int: number of steps in the MCMC self.steps = steps #:float: residual uncertainty allowed when matching observation with known lines self.resid_delta = resid_delta #:float: gaussian smoothing applied to the wavecal spectrum before the MCMC in pixel scale, disable it by setting it to 0 self.smoothing = smoothing #:float: minimum value in the spectrum to be considered a spectral line, if the value is above (or equal 1) it defines the percentile of the spectrum self.cutoff = cutoff
[docs] def get_cutoff(self, spectrum): if self.cutoff == 0: cutoff = None elif self.cutoff < 1: cutoff = self.cutoff else: cutoff = np.nanpercentile(spectrum[spectrum != 0], self.cutoff) return cutoff
[docs] def normalize(self, spectrum): smoothing = self.smoothing spectrum = np.copy(spectrum) spectrum -= np.nanmedian(spectrum) if smoothing != 0: spectrum = gaussian_filter1d(spectrum, smoothing) spectrum[spectrum < 0] = 0 spectrum /= np.max(spectrum) return spectrum
[docs] def determine_wavelength_coefficients( self, spectrum, atlas, wave_range, ) -> np.ndarray: """ Determines the wavelength polynomial coefficients of a spectrum, based on an line atlas with known spectral lines, and an initial guess for the wavelength range. The calculation uses an MCMC approach to sample the probability space and find the best cross correlation value, between observation and atlas. Parameters ---------- spectrum : array observed spectrum at each pixel atlas : LineAtlas atlas containing a known spectrum with wavelength and flux wave_range : 2-tuple initial wavelength guess (begin, end) degrees : int, optional number of degrees of the wavelength polynomial, lower numbers yield better results, by default 2 w_range : float, optional uncertainty on the initial wavelength guess in Ansgtrom, by default 20 nwalkers : int, optional number of walkers for the MCMC, more is better but increases the time, by default 100 steps : int, optional number of steps in the MCMC per walker, more is better but increases the time, by default 20_000 plot : bool, optional whether to plot the results or not, by default False Returns ------- coef : array polynomial coefficients in numpy order """ spectrum = np.asarray(spectrum) assert self.degree >= 2, "The polynomial degree must be at least 2" assert spectrum.ndim == 1, "The spectrum should only have 1 dimension" assert self.wave_delta > 0, "The wavelength uncertainty needs to be positive" n_features = spectrum.shape[0] n_output = ndim = self.degree + 1 # Normalize the spectrum, and copy it just in case spectrum = self.normalize(spectrum) cutoff = self.get_cutoff(spectrum) # The pixel scale used for everything else x = np.arange(n_features) # Initial guess for the wavelength solution coef = np.zeros(n_output) coef[-1] = wave_range[0] coef[-2] = (wave_range[-1] - wave_range[0]) / n_features # We scale every coefficient to roughly order 1 # this is then in units of the maximum offset due to a change in this value # in angstrom w_scale = 1 / np.power(n_features, range(n_output)) factors = w_scale[::-1] coef /= factors # Here we define the functions we need for the MCMC def polyval_vectorize(p, x, where=None): n_poly, n_coef = p.shape n_points = x.shape[0] y = np.zeros((n_poly, n_points)) if where is not None: for i in range(n_coef): y[where] *= x y[where] += p[where, i, None] else: for i in range(n_coef): y *= x y += p[:, i, None] return y def log_prior(p): prior = np.zeros(p.shape[0]) prior[np.any(~np.isfinite(p), axis=1)] = -np.inf prior[np.any(np.abs(p - coef) > self.wave_delta, axis=1)] = -np.inf return prior def log_prior_2(w): # Chech that w is increasing prior = np.zeros(w.shape[0]) prior[np.any(w[:, 1:] < w[:, :-1], axis=1)] = -np.inf prior[w[:, 0] < wave_range[0] - self.wave_delta] = -np.inf prior[w[:, -1] > wave_range[1] + self.wave_delta] = -np.inf return prior def log_prob(p): # Check that p is within bounds prior = log_prior(p) where = np.isfinite(prior) # Calculate the wavelength scale w = polyval_vectorize(p * factors, x, where=where) # Check that it is monotonically increasing prior += log_prior_2(w) where = np.isfinite(prior) y = np.zeros((p.shape[0], x.shape[0])) y[where, :] = np.interp(w[where, :], atlas.wave, atlas.flux) y[where, :] /= np.max(y[where, :], axis=1)[:, None] # This is the cross correlation value squared cross = np.sum(y * spectrum, axis=1) ** 2 # chi2 = - np.sum((y - spectrum)**2, axis=1) # chi2 = - np.sum((np.where(y > 0.01, 1, 0) - np.where(spectrum > 0.01, 1, 0))**2, axis=1) # this is the same as above, but a lot faster thanks to the magic of bitwise xor if cutoff is not None: chi2 = (y > cutoff) ^ (spectrum > cutoff) chi2 = -np.count_nonzero(chi2, axis=1) / 20 else: chi2 = -np.sum((y - spectrum) ** 2, axis=1) / 20 return prior + cross + chi2 p0 = np.zeros((self.nwalkers, ndim)) p0 += coef[None, :] p0 += np.random.uniform( low=-self.wave_delta, high=self.wave_delta, size=(self.nwalkers, ndim) ) sampler = emcee.EnsembleSampler( self.nwalkers, ndim, log_prob, vectorize=True, moves=[(emcee.moves.DEMove(), 0.8), (emcee.moves.DESnookerMove(), 0.2)], ) state = sampler.run_mcmc(p0, self.steps, progress=True) tau = sampler.get_autocorr_time(quiet=True) burnin = int(2 * np.max(tau)) thin = int(0.5 * np.min(tau)) samples = sampler.get_chain(discard=burnin, thin=thin, flat=True) low, mid, high = np.percentile(samples, [32, 50, 68], axis=0) coef = mid * factors if self.plot: fig = corner.corner(samples, truths=mid) plt.show() wave = np.polyval(coef, x) y = np.interp(wave, atlas.wave, atlas.flux) y /= np.max(y) plt.plot(wave, spectrum) plt.plot(wave, y) plt.show() return coef
[docs] def create_new_linelist_from_solution( self, spectrum, wavelength, atlas, order, ) -> LineList: """ Create a new linelist based on an existing wavelength solution for a spectrum, and a line atlas with known lines. The linelist is the one used by the rest of PyReduce wavelength calibration. Observed lines are matched with the lines in the atlas to improve the wavelength solution. Parameters ---------- spectrum : array Observed spectrum at each pixel wavelength : array Wavelength of spectrum at each pixel atlas : LineAtlas Atlas with wavelength of known lines order : int Order of the spectrum within the detector resid_delta : float, optional Maximum residual allowed between a peak and the closest line in the atlas, to still match them, in m/s, by default 1000. Returns ------- linelist : LineList new linelist with lines from this order """ # The new linelist linelist = LineList() spectrum = np.asarray(spectrum) wavelength = np.asarray(wavelength) assert self.resid_delta > 0, "Residuals Delta must be positive" assert spectrum.ndim == 1, "Spectrum must have only 1 dimension" assert wavelength.ndim == 1, "Wavelength must have only 1 dimension" assert ( spectrum.size == wavelength.size ), "Spectrum and Wavelength must have the same size" n_features = spectrum.shape[0] x = np.arange(n_features) smoothing = self.smoothing # Normalize just in case spectrum = self.normalize(spectrum) cutoff = self.get_cutoff(spectrum) # TODO: make this use another function, and pass the hight as a parameter scopy = np.copy(spectrum) if cutoff is not None: scopy[scopy < cutoff] = 0 _, peaks = self._find_peaks(scopy) peak_wave = np.interp(peaks, x, wavelength) peak_height = np.interp(peaks, x, spectrum) # Here we only look at the lines within range atlas_linelist = atlas.linelist[ (atlas.linelist["wave"] > wavelength[0]) & (atlas.linelist["wave"] < wavelength[-1]) ] residuals = np.zeros_like(peak_wave) for i, pw in enumerate(peak_wave): resid = np.abs(pw - atlas_linelist["wave"]) j = np.argmin(resid) residuals[i] = resid[j] / pw * speed_of_light if residuals[i] < self.resid_delta: linelist.add_line( atlas_linelist["wave"][j], order, peaks[i], 3, peak_height[i], True, ) return linelist
[docs] def execute(self, spectrum, wave_range) -> LineList: atlas = LineAtlas(self.element, self.medium) linelist = LineList() orders = range(spectrum.shape[0]) x = np.arange(spectrum.shape[1]) for order in orders: spec = spectrum[order] wrange = wave_range[order] coef = self.determine_wavelength_coefficients(spec, atlas, wrange) wave = np.polyval(coef, x) linelist_loc = self.create_new_linelist_from_solution( spec, wave, atlas, order ) linelist.append(linelist_loc) return linelist