Source code for pyreduce.util

# -*- coding: utf-8 -*-
"""
Collection of various useful and/or reoccuring functions across PyReduce
"""

import logging
import os
from itertools import product

import matplotlib.pyplot as plt
import numpy as np
import scipy.constants
import scipy.interpolate
from astropy import coordinates as coord
from astropy import time
from astropy import units as u
from astropy.io import fits
from mpl_toolkits.mplot3d import Axes3D
from scipy.linalg import lstsq, solve, solve_banded
from scipy.ndimage.filters import median_filter
from scipy.optimize import curve_fit, least_squares
from scipy.special import binom

from . import __version__
from .clipnflip import clipnflip
from .instruments.instrument_info import modeinfo

logger = logging.getLogger(__name__)


[docs]def resample(array, new_size): x = np.arange(new_size) xp = np.linspace(0, new_size, len(array)) return np.interp(x, xp, array)
[docs]def remove_bias(img, ihead, bias, bhead, nfiles=1): if bias is not None and bhead is not None: b_exptime = bhead["EXPTIME"] i_exptime = ihead["EXPTIME"] if b_exptime == 0 or i_exptime == 0: b_exptime = 1 i_exptime = nfiles img = img - bias * i_exptime / b_exptime return img
[docs]def in_ipynb(): try: cfg = get_ipython().config if cfg["IPKernelApp"]["parent_appname"] == "ipython-notebook": return True else: return False except NameError: return False
[docs]def log_version(): """For Debug purposes""" logger.debug("----------------------") logger.debug("PyReduce version: %s", __version__)
[docs]def start_logging(log_file="log.log"): """Start logging to log file and command line Parameters ---------- log_file : str, optional name of the logging file (default: "log.log") """ os.makedirs(os.path.dirname(log_file), exist_ok=True) logging.basicConfig( filename=log_file, level=logging.DEBUG, format="%(asctime)-15s - %(levelname)s - %(name)-8s - %(message)s", ) logging.captureWarnings(True) log_version()
[docs]def vac2air(wl_vac): """ Convert vacuum wavelengths to wavelengths in air Author: Nikolai Piskunov """ wl_air = wl_vac ii = np.where(wl_vac > 2e3) sigma2 = (1e4 / wl_vac[ii]) ** 2 # Compute wavenumbers squared fact = ( 1e0 + 8.34254e-5 + 2.406147e-2 / (130e0 - sigma2) + 1.5998e-4 / (38.9e0 - sigma2) ) wl_air[ii] = wl_vac[ii] / fact # Convert to air wavelength return wl_air
[docs]def air2vac(wl_air): """ Convert wavelengths in air to vacuum wavelength Author: Nikolai Piskunov """ wl_vac = np.copy(wl_air) ii = np.where(wl_air > 1999.352) sigma2 = (1e4 / wl_air[ii]) ** 2 # Compute wavenumbers squared fact = ( 1e0 + 8.336624212083e-5 + 2.408926869968e-2 / (1.301065924522e2 - sigma2) + 1.599740894897e-4 / (3.892568793293e1 - sigma2) ) wl_vac[ii] = wl_air[ii] * fact # Convert to vacuum wavelength return wl_vac
[docs]def swap_extension(fname, ext, path=None): """exchange the extension of the given file with a new one""" if path is None: path = os.path.dirname(fname) nameout = os.path.basename(fname) if nameout[-3:] == ".gz": nameout = nameout[:-3] nameout = nameout.rsplit(".", 1)[0] nameout = os.path.join(path, nameout + ext) return nameout
[docs]def find_first_index(arr, value): """find the first element equal to value in the array arr""" try: return next(i for i, v in enumerate(arr) if v == value) except StopIteration: raise Exception("Value %s not found" % value)
[docs]def interpolate_masked(masked): """Interpolate masked values, from non masked values Parameters ---------- masked : masked_array masked array to interpolate on Returns ------- interpolated : array interpolated non masked array """ mask = np.ma.getmaskarray(masked) idx = np.nonzero(~mask)[0] interpol = np.interp(np.arange(len(masked)), idx, masked[idx]) return interpol
[docs]def cutout_image(img, ymin, ymax, xmin, xmax): """Cut a section of an image out Parameters ---------- img : array image ymin : array[ncol](int) lower y value ymax : array[ncol](int) upper y value xmin : int lower x value xmax : int upper x value Returns ------- cutout : array[height, ncol] selection of the image """ cutout = np.zeros((ymax[0] - ymin[0] + 1, xmax - xmin), dtype=img.dtype) for i, x in enumerate(range(xmin, xmax)): cutout[:, i] = img[ymin[x] : ymax[x] + 1, x] return cutout
[docs]def make_index(ymin, ymax, xmin, xmax, zero=0): """Create an index (numpy style) that will select part of an image with changing position but fixed height The user is responsible for making sure the height is constant, otherwise it will still work, but the subsection will not have the desired format Parameters ---------- ymin : array[ncol](int) lower y border ymax : array[ncol](int) upper y border xmin : int leftmost column xmax : int rightmost colum zero : bool, optional if True count y array from 0 instead of xmin (default: False) Returns ------- index : tuple(array[height, width], array[height, width]) numpy index for the selection of a subsection of an image """ # TODO # Define the indices for the pixels between two y arrays, e.g. pixels in an order # in x: the rows between ymin and ymax # in y: the column, but n times to match the x index ymin = np.asarray(ymin, dtype=int) ymax = np.asarray(ymax, dtype=int) xmin = int(xmin) xmax = int(xmax) if zero: zero = xmin index_x = np.array( [np.arange(ymin[col], ymax[col] + 1) for col in range(xmin - zero, xmax - zero)] ) index_y = np.array( [ np.full(ymax[col] - ymin[col] + 1, col) for col in range(xmin - zero, xmax - zero) ] ) index = index_x.T, index_y.T + zero return index
[docs]def gridsearch(func, grid, args=(), kwargs={}): matrix = np.zeros(grid.shape[:-1]) for idx in np.ndindex(grid.shape[:-1]): value = grid[idx] print(f"Value: {value}") try: result = func(value, *args, **kwargs) print(f"Success: {result}") except Exception as e: result = np.nan print(f"Failed: {e}") finally: matrix[idx] = result return matrix
[docs]def gaussfit(x, y): """ Fit a simple gaussian to data gauss(x, a, mu, sigma) = a * exp(-z**2/2) with z = (x - mu) / sigma Parameters ---------- x : array(float) x values y : array(float) y values Returns ------- gauss(x), parameters fitted values for x, fit paramters (a, mu, sigma) """ gauss = lambda x, A0, A1, A2: A0 * np.exp(-(((x - A1) / A2) ** 2) / 2) popt, _ = curve_fit(gauss, x, y, p0=[max(y), 0, 1]) return gauss(x, *popt), popt
[docs]def gaussfit2(x, y): """Fit a gaussian(normal) curve to data x, y gauss = A * exp(-(x-mu)**2/(2*sig**2)) + offset Parameters ---------- x : array[n] x values y : array[n] y values Returns ------- popt : array[4] coefficients of the gaussian: A, mu, sigma**2, offset """ gauss = gaussval2 x = np.ma.compressed(x) y = np.ma.compressed(y) if len(x) == 0 or len(y) == 0: raise ValueError("All values masked") if len(x) != len(y): raise ValueError("The masks of x and y are different") # Find the peak in the center of the image weights = np.ones(len(y), dtype=y.dtype) midpoint = len(y) // 2 weights[:midpoint] = np.linspace(0, 1, midpoint, dtype=weights.dtype) weights[midpoint:] = np.linspace(1, 0, len(y) - midpoint, dtype=weights.dtype) i = np.argmax(y * weights) p0 = [y[i], x[i], 1] with np.warnings.catch_warnings(): np.warnings.simplefilter("ignore") res = least_squares( lambda c: gauss(x, *c, np.ma.min(y)) - y, p0, loss="soft_l1", bounds=( [min(np.ma.mean(y), y[i]), np.ma.min(x), 0], [np.ma.max(y) * 1.5, np.ma.max(x), len(x) / 2], ), ) popt = list(res.x) + [np.min(y)] return popt
[docs]def gaussfit3(x, y): """A very simple (and relatively fast) gaussian fit gauss = A * exp(-(x-mu)**2/(2*sig**2)) + offset Parameters ---------- x : array of shape (n,) x data y : array of shape (n,) y data Returns ------- popt : list of shape (4,) Parameters A, mu, sigma**2, offset """ mask = np.ma.getmaskarray(x) | np.ma.getmaskarray(y) x, y = x[~mask], y[~mask] gauss = gaussval2 i = np.argmax(y[len(y) // 4 : len(y) * 3 // 4]) + len(y) // 4 p0 = [y[i], x[i], 1, np.min(y)] with np.warnings.catch_warnings(): np.warnings.simplefilter("ignore") popt, _ = curve_fit(gauss, x, y, p0=p0) return popt
[docs]def gaussfit4(x, y): """A very simple (and relatively fast) gaussian fit gauss = A * exp(-(x-mu)**2/(2*sig**2)) + offset Assumes x is sorted Parameters ---------- x : array of shape (n,) x data y : array of shape (n,) y data Returns ------- popt : list of shape (4,) Parameters A, mu, sigma**2, offset """ gauss = gaussval2 x = np.ma.compressed(x) y = np.ma.compressed(y) i = np.argmax(y) p0 = [y[i], x[i], 1, np.min(y)] with np.warnings.catch_warnings(): np.warnings.simplefilter("ignore") popt, _ = curve_fit(gauss, x, y, p0=p0) return popt
[docs]def gaussfit_linear(x, y): """Transform the gaussian fit into a linear least squares problem, and solve that instead of the non-linear curve fit For efficiency reasons. (roughly 10 times faster than the curve fit) Parameters ---------- x : array of shape (n,) x data y : array of shape (n,) y data Returns ------- coef : tuple a, mu, sig, 0 """ x = x[y > 0] y = y[y > 0] offset = np.min(y) y = y - offset + 1e-12 weights = y d = np.log(y) G = np.ones((x.size, 3), dtype=np.float) G[:, 0] = x ** 2 G[:, 1] = x beta, _, _, _ = np.linalg.lstsq( (G.T * weights ** 2).T, d * weights ** 2, rcond=None ) a = np.exp(beta[2] - beta[1] ** 2 / (4 * beta[0])) sig = -1 / (2 * beta[0]) mu = -beta[1] / (2 * beta[0]) return a, mu, sig, offset
[docs]def gaussval2(x, a, mu, sig, const): return a * np.exp(-((x - mu) ** 2) / (2 * sig)) + const
[docs]def gaussbroad(x, y, hwhm): """ Apply gaussian broadening to x, y data with half width half maximum hwhm Parameters ---------- x : array(float) x values y : array(float) y values hwhm : float > 0 half width half maximum Returns ------- array(float) broadened y values """ # alternatively use: # from scipy.ndimage.filters import gaussian_filter1d as gaussbroad # but that doesn't have an x coordinate nw = len(x) dw = (x[-1] - x[0]) / (len(x) - 1) if hwhm > 5 * (x[-1] - x[0]): return np.full(len(x), sum(y) / len(x)) nhalf = int(3.3972872 * hwhm / dw) ng = 2 * nhalf + 1 # points in gaussian (odd!) # wavelength scale of gaussian wg = dw * (np.arange(0, ng, 1, dtype=float) - (ng - 1) / 2) xg = (0.83255461 / hwhm) * wg # convenient absisca gpro = (0.46974832 * dw / hwhm) * np.exp(-xg * xg) # unit area gaussian w/ FWHM gpro = gpro / np.sum(gpro) # Pad spectrum ends to minimize impact of Fourier ringing. npad = nhalf + 2 # pad pixels on each end spad = np.concatenate((np.full(npad, y[0]), y, np.full(npad, y[-1]))) # Convolve and trim. sout = np.convolve(spad, gpro) # convolve with gaussian sout = sout[npad : npad + nw] # trim to original data / length return sout # return broadened spectrum.
[docs]def polyfit1d(x, y, degree=1, regularization=0): idx = np.arange(degree + 1) coeff = np.zeros(degree + 1) A = np.array([np.power(x, i) for i in idx], dtype=float).T b = y.ravel() L = np.array([regularization * i ** 2 for i in idx]) I = np.linalg.inv(A.T @ A + np.diag(L)) coeff = I @ A.T @ b coeff = coeff[::-1] return coeff
def _get_coeff_idx(coeff): idx = np.indices(coeff.shape) idx = idx.T.swapaxes(0, 1).reshape((-1, 2)) # degree = coeff.shape # idx = [[i, j] for i, j in product(range(degree[0]), range(degree[1]))] # idx = np.asarray(idx) return idx def _scale(x, y): # Normalize x and y to avoid huge numbers # Mean 0, Variation 1 offset_x, offset_y = np.mean(x), np.mean(y) norm_x, norm_y = np.std(x), np.std(y) if norm_x == 0: norm_x = 1 if norm_y == 0: norm_y = 1 x = (x - offset_x) / norm_x y = (y - offset_y) / norm_y return x, y, (norm_x, norm_y), (offset_x, offset_y) def _unscale(x, y, norm, offset): x = x * norm[0] + offset[0] y = y * norm[1] + offset[1] return x, y
[docs]def polyvander2d(x, y, degree): # A = np.array([x ** i * y ** j for i, j in idx], dtype=float).T A = np.polynomial.polynomial.polyvander2d(x, y, degree) return A
[docs]def polyscale2d(coeff, scale_x, scale_y, copy=True): if copy: coeff = np.copy(coeff) idx = _get_coeff_idx(coeff) for k, (i, j) in enumerate(idx): coeff[i, j] /= scale_x ** i * scale_y ** j return coeff
[docs]def polyshift2d(coeff, offset_x, offset_y, copy=True): if copy: coeff = np.copy(coeff) idx = _get_coeff_idx(coeff) # Copy coeff because it changes during the loop coeff2 = np.copy(coeff) for k, m in idx: not_the_same = ~((idx[:, 0] == k) & (idx[:, 1] == m)) above = (idx[:, 0] >= k) & (idx[:, 1] >= m) & not_the_same for i, j in idx[above]: b = binom(i, k) * binom(j, m) sign = (-1) ** ((i - k) + (j - m)) offset = offset_x ** (i - k) * offset_y ** (j - m) coeff[k, m] += sign * b * coeff2[i, j] * offset return coeff
[docs]def plot2d(x, y, z, coeff, title=None): # regular grid covering the domain of the data if x.size > 500: choice = np.random.choice(x.size, size=500, replace=False) else: choice = slice(None, None, None) x, y, z = x[choice], y[choice], z[choice] X, Y = np.meshgrid( np.linspace(np.min(x), np.max(x), 20), np.linspace(np.min(y), np.max(y), 20) ) Z = np.polynomial.polynomial.polyval2d(X, Y, coeff) fig = plt.figure() ax = fig.gca(projection="3d") ax.plot_surface(X, Y, Z, rstride=1, cstride=1, alpha=0.2) ax.scatter(x, y, z, c="r", s=50) plt.xlabel("X") plt.ylabel("Y") ax.set_zlabel("Z") if title is not None: plt.title(title) # ax.axis("equal") # ax.axis("tight") plt.show()
[docs]def polyfit2d( x, y, z, degree=1, max_degree=None, scale=True, plot=False, plot_title=None ): """A simple 2D plynomial fit to data x, y, z The polynomial can be evaluated with numpy.polynomial.polynomial.polyval2d Parameters ---------- x : array[n] x coordinates y : array[n] y coordinates z : array[n] data values degree : int, optional degree of the polynomial fit (default: 1) max_degree : {int, None}, optional if given the maximum combined degree of the coefficients is limited to this value scale : bool, optional Wether to scale the input arrays x and y to mean 0 and variance 1, to avoid numerical overflows. Especially useful at higher degrees. (default: True) plot : bool, optional wether to plot the fitted surface and data (slow) (default: False) Returns ------- coeff : array[degree+1, degree+1] the polynomial coefficients in numpy 2d format, i.e. coeff[i, j] for x**i * y**j """ # Flatten input x = np.asarray(x).ravel() y = np.asarray(y).ravel() z = np.asarray(z).ravel() # Removed masked values mask = ~(np.ma.getmask(z) | np.ma.getmask(x) | np.ma.getmask(y)) x, y, z = x[mask].ravel(), y[mask].ravel(), z[mask].ravel() if scale: x, y, norm, offset = _scale(x, y) # Create combinations of degree of x and y # usually: [(0, 0), (1, 0), (0, 1), (1, 1), (2, 0), ....] if np.isscalar(degree): degree = (int(degree), int(degree)) assert len(degree) == 2, "Only 2D polynomials can be fitted" degree = [int(degree[0]), int(degree[1])] # idx = [[i, j] for i, j in product(range(degree[0] + 1), range(degree[1] + 1))] coeff = np.zeros((degree[0] + 1, degree[1] + 1)) idx = _get_coeff_idx(coeff) # Calculate elements 1, x, y, x*y, x**2, y**2, ... A = polyvander2d(x, y, degree) # We only want the combinations with maximum order COMBINED power if max_degree is not None: mask = idx[:, 0] + idx[:, 1] <= int(max_degree) idx = idx[mask] A = A[:, mask] # Do least squares fit C, *_ = lstsq(A, z) # Reorder coefficients into numpy compatible 2d array for k, (i, j) in enumerate(idx): coeff[i, j] = C[k] # # Backup copy of coeff if scale: coeff = polyscale2d(coeff, *norm, copy=False) coeff = polyshift2d(coeff, *offset, copy=False) if plot: # pragma: no cover if scale: x, y = _unscale(x, y, norm, offset) plot2d(x, y, z, coeff, title=plot_title) return coeff
[docs]def polyfit2d_2(x, y, z, degree=1, x0=None, loss="arctan", method="trf", plot=False): x = x.ravel() y = y.ravel() z = z.ravel() if np.isscalar(degree): degree_x = degree_y = degree + 1 else: degree_x = degree[0] + 1 degree_y = degree[1] + 1 polyval2d = np.polynomial.polynomial.polyval2d def func(c): c = c.reshape(degree_x, degree_y) value = polyval2d(x, y, c) return value - z if x0 is None: x0 = np.zeros(degree_x * degree_y) else: x0 = x0.ravel() res = least_squares(func, x0, loss=loss, method=method) coef = res.x coef = coef.reshape(degree_x, degree_y) if plot: # pragma: no cover # regular grid covering the domain of the data if x.size > 500: choice = np.random.choice(x.size, size=500, replace=False) else: choice = slice(None, None, None) x, y, z = x[choice], y[choice], z[choice] X, Y = np.meshgrid( np.linspace(np.min(x), np.max(x), 20), np.linspace(np.min(y), np.max(y), 20) ) Z = np.polynomial.polynomial.polyval2d(X, Y, coef) fig = plt.figure() ax = fig.gca(projection="3d") ax.plot_surface(X, Y, Z, rstride=1, cstride=1, alpha=0.2) ax.scatter(x, y, z, c="r", s=50) plt.xlabel("X") plt.ylabel("Y") ax.set_zlabel("Z") ax.axis("equal") ax.axis("tight") plt.show() return coef
[docs]def bezier_interp(x_old, y_old, x_new): """ Bezier interpolation, based on the scipy methods This mostly sanitizes the input by removing masked values and duplicate entries Note that in case of duplicate entries (in x_old) the results are not well defined as only one of the entries is used and the other is discarded Parameters ---------- x_old : array[n] old x values y_old : array[n] old y values x_new : array[m] new x values Returns ------- y_new : array[m] new y values """ # Handle masked arrays if np.ma.is_masked(x_old): x_old = np.ma.compressed(x_old) y_old = np.ma.compressed(y_old) # avoid duplicate entries in x assert x_old.size == y_old.size x_old, index = np.unique(x_old, return_index=True) y_old = y_old[index] knots, coef, order = scipy.interpolate.splrep(x_old, y_old, s=0) y_new = scipy.interpolate.BSpline(knots, coef, order)(x_new) return y_new
[docs]def safe_interpolation(x_old, y_old, x_new=None, fill_value=0): """ 'Safe' interpolation method that should avoid the common pitfalls of spline interpolation masked arrays are compressed, i.e. only non masked entries are used remove NaN input in x_old and y_old only unique x values are used, corresponding y values are 'random' if all else fails, revert to linear interpolation Parameters ---------- x_old : array of size (n,) x values of the data y_old : array of size (n,) y values of the data x_new : array of size (m, ) or None, optional x values of the interpolated values if None will return the interpolator object (default: None) Returns ------- y_new: array of size (m, ) or interpolator if x_new was given, return the interpolated values otherwise return the interpolator object """ # Handle masked arrays if np.ma.is_masked(x_old): x_old = np.ma.compressed(x_old) y_old = np.ma.compressed(y_old) mask = np.isfinite(x_old) & np.isfinite(y_old) x_old = x_old[mask] y_old = y_old[mask] # avoid duplicate entries in x # also sorts data, which allows us to use assume_sorted below x_old, index = np.unique(x_old, return_index=True) y_old = y_old[index] try: interpolator = scipy.interpolate.interp1d( x_old, y_old, kind="cubic", fill_value=fill_value, bounds_error=False, assume_sorted=True, ) except ValueError: logging.warning( "Could not instantiate cubic spline interpolation, using linear instead" ) interpolator = scipy.interpolate.interp1d( x_old, y_old, kind="linear", fill_value=fill_value, bounds_error=False, assume_sorted=True, ) if x_new is not None: return interpolator(x_new) else: return interpolator
[docs]def bottom(f, order=1, iterations=40, eps=0.001, poly=False, weight=1, **kwargs): """ bottom tries to fit a smooth curve to the lower envelope of 1D data array f. Filter size "filter" together with the total number of iterations determine the smoothness and the quality of the fit. The total number of iterations can be controlled by limiting the maximum number of iterations (iter) and/or by setting the convergence criterion for the fit (eps) 04-Nov-2000 N.Piskunov wrote. 09-Nov-2011 NP added weights and 2nd derivative constraint as LAM2 Parameters ---------- f : Callable Function to fit filter : int Smoothing parameter of the optimal filter (or polynomial degree of poly is True) iter : int maximum number of iterations [def: 40] eps : float convergence level [def: 0.001] mn : float minimum function values to be considered [def: min(f)] mx : float maximum function values to be considered [def: max(f)] lam2 : float constraint on 2nd derivative weight : array(float) vector of weights. """ mn = kwargs.get("min", np.min(f)) mx = kwargs.get("max", np.max(f)) lambda2 = kwargs.get("lambda2", -1) if poly: j = np.where((f >= mn) & (f <= mx)) xx = np.linspace(-1, 1, num=len(f)) fmin = np.min(f[j]) - 1 fmax = np.max(f[j]) + 1 ff = (f[j] - fmin) / (fmax - fmin) ff_old = np.copy(ff) else: fff = middle( f, order, iterations=iterations, eps=eps, weight=weight, lambda2=lambda2 ) fmin = min(f) - 1 fmax = max(f) + 1 fff = (fff - fmin) / (fmax - fmin) ff = (f - fmin) / (fmax - fmin) / fff ff_old = np.copy(ff) for _ in range(iterations): if poly: if order > 0: # this is a bug in rsi poly routine t = median_filter(np.polyval(np.polyfit(xx, ff, order), xx), 3) t = np.clip(t - ff, 0, None) ** 2 tmp = np.polyval(np.polyfit(xx, t, order), xx) dev = np.sqrt(np.nan_to_num(tmp)) else: t = np.tile(np.polyfit(xx, ff, order), len(f)) t = np.polyfit(xx, np.clip(t - ff, 0, None) ** 2, order) t = np.tile(t, len(f)) dev = np.nan_to_num(t) dev = np.sqrt(t) else: t = median_filter(opt_filter(ff, order, weight=weight, lambda2=lambda2), 3) dev = np.sqrt( opt_filter( np.clip(weight * (t - ff), 0, None), order, weight=weight, lambda2=lambda2, ) ) ff = np.clip( np.clip(t - dev, ff, None), None, t ) # the order matters, t dominates dev2 = np.max(weight * np.abs(ff - ff_old)) ff_old = ff if dev2 <= eps: break if poly: if order > 0: # this is a bug in rsi poly routine t = median_filter(np.polyval(np.polyfit(xx, ff, order), xx), 3) else: t = np.tile(np.polyfit(xx, ff, order), len(f)) return t * (fmax - fmin) + fmin else: return t * fff * (fmax - fmin) + fmin
[docs]def middle( f, param, x=None, iterations=40, eps=0.001, poly=False, weight=1, lambda2=-1, mn=None, mx=None, ): """ middle tries to fit a smooth curve that is located along the "middle" of 1D data array f. Filter size "filter" together with the total number of iterations determine the smoothness and the quality of the fit. The total number of iterations can be controlled by limiting the maximum number of iterations (iter) and/or by setting the convergence criterion for the fit (eps) 04-Nov-2000 N.Piskunov wrote. 09-Nov-2011 NP added weights and 2nd derivative constraint as LAM2 Parameters ---------- f : Callable Function to fit filter : int Smoothing parameter of the optimal filter (or polynomial degree of poly is True) iter : int maximum number of iterations [def: 40] eps : float convergence level [def: 0.001] mn : float minimum function values to be considered [def: min(f)] mx : float maximum function values to be considered [def: max(f)] lam2 : float constraint on 2nd derivative weight : array(float) vector of weights. """ mn = mn if mn is not None else np.min(f) mx = mx if mx is not None else np.max(f) f = np.asarray(f) if x is None: xx = np.linspace(-1, 1, num=f.size) else: xx = np.asarray(x) if poly: j = (f >= mn) & (f <= mx) n = np.count_nonzero(j) if n <= round(param): return f fmin = np.min(f[j]) - 1 fmax = np.max(f[j]) + 1 ff = (f[j] - fmin) / (fmax - fmin) ff_old = ff else: fmin = np.min(f) - 1 fmax = np.max(f) + 1 ff = (f - fmin) / (fmax - fmin) ff_old = ff n = len(f) for _ in range(iterations): if poly: param = round(param) if param > 0: t = median_filter(np.polyval(np.polyfit(xx, ff, param), xx), 3) tmp = np.polyval(np.polyfit(xx, (t - ff) ** 2, param), xx) else: t = np.tile(np.polyfit(xx, ff, param), len(f)) tmp = np.tile(np.polyfit(xx, (t - ff) ** 2, param), len(f)) else: t = median_filter(opt_filter(ff, param, weight=weight, lambda2=lambda2), 3) tmp = opt_filter( weight * (t - ff) ** 2, param, weight=weight, lambda2=lambda2 ) dev = np.sqrt(np.clip(tmp, 0, None)) ff = np.clip(t - dev, ff, t + dev) dev2 = np.max(weight * np.abs(ff - ff_old)) ff_old = ff # print(dev2) if dev2 <= eps: break if poly: xx = np.linspace(-1, 1, len(f)) if param > 0: t = median_filter(np.polyval(np.polyfit(xx, ff, param), xx), 3) else: t = np.tile(np.polyfit(xx, ff, param), len(f)) return t * (fmax - fmin) + fmin
[docs]def top( f, order=1, iterations=40, eps=0.001, poly=False, weight=1, lambda2=-1, mn=None, mx=None, ): """ top tries to fit a smooth curve to the upper envelope of 1D data array f. Filter size "filter" together with the total number of iterations determine the smoothness and the quality of the fit. The total number of iterations can be controlled by limiting the maximum number of iterations (iter) and/or by setting the convergence criterion for the fit (eps) 04-Nov-2000 N.Piskunov wrote. 09-Nov-2011 NP added weights and 2nd derivative constraint as LAM2 Parameters ---------- f : Callable Function to fit filter : int Smoothing parameter of the optimal filter (or polynomial degree of poly is True) iter : int maximum number of iterations [def: 40] eps : float convergence level [def: 0.001] mn : float minimum function values to be considered [def: min(f)] mx : float maximum function values to be considered [def: max(f)] lam2 : float constraint on 2nd derivative weight : array(float) vector of weights. """ mn = mn if mn is not None else np.min(f) mx = mx if mx is not None else np.max(f) f = np.asarray(f) xx = np.linspace(-1, 1, num=f.size) if poly: j = (f >= mn) & (f <= mx) if np.count_nonzero(j) <= round(order): raise ValueError("Not enough points") fmin = np.min(f[j]) - 1 fmax = np.max(f[j]) + 1 ff = (f - fmin) / (fmax - fmin) ff_old = ff else: fff = middle( f, order, iterations=iterations, eps=eps, weight=weight, lambda2=lambda2 ) fmin = np.min(f) - 1 fmax = np.max(f) + 1 fff = (fff - fmin) / (fmax - fmin) ff = (f - fmin) / (fmax - fmin) / fff ff_old = ff for _ in range(iterations): order = round(order) if poly: t = median_filter(np.polyval(np.polyfit(xx, ff, order), xx), 3) tmp = np.polyval(np.polyfit(xx, np.clip(ff - t, 0, None) ** 2, order), xx) dev = np.sqrt(np.clip(tmp, 0, None)) else: t = median_filter(opt_filter(ff, order, weight=weight, lambda2=lambda2), 3) tmp = opt_filter( np.clip(weight * (ff - t), 0, None), order, weight=weight, lambda2=lambda2, ) dev = np.sqrt(np.clip(tmp, 0, None)) ff = np.clip(t - eps, ff, t + dev * 3) dev2 = np.max(weight * np.abs(ff - ff_old)) ff_old = ff if dev2 <= eps: break if poly: t = median_filter(np.polyval(np.polyfit(xx, ff, order), xx), 3) return t * (fmax - fmin) + fmin else: return t * fff * (fmax - fmin) + fmin
[docs]def opt_filter(y, par, par1=None, weight=None, lambda2=-1, maxiter=100): """ Optimal filtering of 1D and 2D arrays. Uses tridiag in 1D case and sprsin and linbcg in 2D case. Written by N.Piskunov 8-May-2000 Parameters ---------- f : array 1d or 2d array xwidth : int filter width (for 2d array width in x direction (1st index) ywidth : int (for 2d array only) filter width in y direction (2nd index) if ywidth is missing for 2d array, it set equal to xwidth weight : array(float) an array of the same size(s) as f containing values between 0 and 1 lambda1: float regularization parameter maxiter : int maximum number of iteration for filtering of 2d array """ y = np.asarray(y) if y.ndim not in [1, 2]: raise ValueError("Input y must have 1 or 2 dimensions") if par < 1: par = 1 # 1D case if y.ndim == 1 or (y.ndim == 2 and (y.shape[0] == 1 or y.shape[1] == 1)): y = y.ravel() n = y.size if weight is None: weight = np.ones(n) elif np.isscalar(weight): weight = np.full(n, weight) else: weight = weight[:n] if lambda2 > 0: # Apply regularization lambda aij = np.zeros((5, n)) # 2nd lower subdiagonal aij[0, 2:] = lambda2 # Lower subdiagonal aij[1, 1] = -par - 2 * lambda2 aij[1, 2:-1] = -par - 4 * lambda2 aij[1, -1] = -par - 2 * lambda2 # Main diagonal aij[2, 0] = weight[0] + par + lambda2 aij[2, 1] = weight[1] + 2 * par + 5 * lambda2 aij[2, 2:-2] = weight[2:-2] + 2 * par + 6 * lambda2 aij[2, -2] = weight[-2] + 2 * par + 5 * lambda2 aij[2, -1] = weight[-1] + par + lambda2 # Upper subdiagonal aij[3, 0] = -par - 2 * lambda2 aij[3, 1:-2] = -par - 4 * lambda2 aij[3, -2] = -par - 2 * lambda2 # 2nd lower subdiagonal aij[4, 0:-2] = lambda2 # RHS b = weight * y f = solve_banded((2, 2), aij, b) else: a = np.full(n, -abs(par)) b = np.copy(weight) + abs(par) b[1:-1] += abs(par) aba = np.array([a, b, a]) f = solve_banded((1, 1), aba, weight * y) return f else: # 2D case if par1 is None: par1 = par if par == 0 and par1 == 0: raise ValueError("xwidth and ywidth can't both be 0") n = y.size nx, ny = y.shape lam_x = abs(par) lam_y = abs(par1) n = nx * ny ndiag = 2 * nx + 1 aij = np.zeros((n, ndiag)) aij[nx, 0] = weight[0, 0] + lam_x + lam_y aij[nx, 1:nx] = weight[0, 1:nx] + 2 * lam_x + lam_y aij[nx, nx : n - nx] = weight[1 : ny - 1] + 2 * (lam_x + lam_y) aij[nx, n - nx : n - 1] = weight[ny - 1, 0 : nx - 1] + 2 * lam_x + lam_y aij[nx, n - 1] = weight[ny - 1, nx - 1] + lam_x + lam_y aij[nx - 1, 1:n] = -lam_x aij[nx + 1, 0 : n - 1] = -lam_x ind = np.arrange(ny - 1) * nx + nx + nx * n aij[ind - 1] = aij[ind - 1] - lam_x aij[ind] = aij[ind] - lam_x ind = np.arrange(ny - 1) * nx + nx aij[nx + 1, ind - 1] = 0 aij[nx - 1, ind] = 0 aij[0, nx:n] = -lam_y aij[ndiag - 1, 0 : n - nx] = -lam_y rhs = f * weight model = solve_banded((nx, nx), aij, rhs) model = np.reshape(model, (ny, nx)) return model
[docs]def helcorr(obs_long, obs_lat, obs_alt, ra2000, dec2000, jd, system="barycentric"): """ calculates heliocentric Julian date, barycentric and heliocentric radial velocity corrections, using astropy functions Parameters --------- obs_long : float Longitude of observatory (degrees, western direction is positive) obs_lat : float Latitude of observatory (degrees) obs_alt : float Altitude of observatory (meters) ra2000 : float Right ascension of object for epoch 2000.0 (hours) dec2000 : float Declination of object for epoch 2000.0 (degrees) jd : float Julian date for the middle of exposure in MJD system : {"barycentric", "heliocentric"}, optional reference system of the result, barycentric: around earth-sun gravity center, heliocentric: around sun, usually barycentric is preferred (default: "barycentric) Returns ------- correction : float radial velocity correction due to barycentre offset hjd : float Heliocentric Julian date for middle of exposure """ # jd = 2400000.5 + jd jd = time.Time(jd, format="mjd") ra = coord.Longitude(ra2000, unit=u.hour) dec = coord.Latitude(dec2000, unit=u.degree) observatory = coord.EarthLocation.from_geodetic(obs_long, obs_lat, height=obs_alt) sky_location = coord.SkyCoord(ra, dec, obstime=jd, location=observatory) times = time.Time(jd, location=observatory) if system == "barycentric": correction = sky_location.radial_velocity_correction().to(u.km / u.s).value ltt = times.light_travel_time(sky_location) elif system == "heliocentric": correction = ( sky_location.radial_velocity_correction("heliocentric").to(u.km / u.s).value ) ltt = times.light_travel_time(sky_location, "heliocentric") else: raise AttributeError( "Could not parse system, values are: ('barycentric', 'heliocentric')" ) times = (times.utc + ltt).value - 2400000 return -correction, times