Source code for pyreduce.cwrappers

# -*- coding: utf-8 -*-
"""
Wrapper for REDUCE C functions

This module provides access to the extraction algorithms in the
C libraries and sanitizes the input parameters.

"""
import ctypes
import logging

import matplotlib.pyplot as plt
import numpy as np
from scipy.ndimage import median_filter

logger = logging.getLogger(__name__)

try:
    from .clib._slitfunc_2d import lib as slitfunc_2dlib
    from .clib._slitfunc_bd import ffi
    from .clib._slitfunc_bd import lib as slitfunclib
except ImportError:  # pragma: no cover
    logger.error(
        "C libraries could not be found. Compiling them by running build_extract.py"
    )
    from .clib import build_extract

    build_extract.build()
    del build_extract

    from .clib._slitfunc_2d import ffi
    from .clib._slitfunc_2d import lib as slitfunc_2dlib
    from .clib._slitfunc_bd import lib as slitfunclib


c_double = ctypes.c_double
c_int = ctypes.c_int
c_mask = ctypes.c_ubyte


[docs]def slitfunc(img, ycen, lambda_sp=0, lambda_sf=0.1, osample=1): """Decompose image into spectrum and slitfunction This is for horizontal straight orders only, for curved orders use slitfunc_curved instead Parameters ---------- img : array[n, m] image to decompose, should just contain a small part of the overall image ycen : array[n] traces the center of the order along the image, relative to the center of the image? lambda_sp : float, optional smoothing parameter of the spectrum (the default is 0, which no smoothing) lambda_sf : float, optional smoothing parameter of the slitfunction (the default is 0.1, which ) osample : int, optional Subpixel ovsersampling factor (the default is 1, which no oversampling) Returns ------- sp, sl, model, unc spectrum, slitfunction, model, spectrum uncertainties """ # Convert input to expected datatypes lambda_sf = float(lambda_sf) lambda_sp = float(lambda_sp) osample = int(osample) img = np.asanyarray(img, dtype=c_double) ycen = np.asarray(ycen, dtype=c_double) assert img.ndim == 2, "Image must be 2 dimensional" assert ycen.ndim == 1, "Ycen must be 1 dimensional" assert ( img.shape[1] == ycen.size ), f"Image and Ycen shapes are incompatible, got {img.shape} and {ycen.shape}" assert osample > 0, f"Oversample rate must be positive, but got {osample}" assert ( lambda_sf >= 0 ), f"Slitfunction smoothing must be positive, but got {lambda_sf}" assert lambda_sp >= 0, f"Spectrum smoothing must be positive, but got {lambda_sp}" # Get some derived values nrows, ncols = img.shape ny = osample * (nrows + 1) + 1 ycen = ycen - ycen.astype(c_int) # Prepare all arrays # Inital guess for slit function and spectrum sp = np.ma.sum(img, axis=0) requirements = ["C", "A", "W", "O"] sp = np.require(sp, dtype=c_double, requirements=requirements) sl = np.zeros(ny, dtype=c_double) mask = ~np.ma.getmaskarray(img) mask = np.require(mask, dtype=c_int, requirements=requirements) img = np.ma.getdata(img) img = np.require(img, dtype=c_double, requirements=requirements) pix_unc = np.zeros_like(img) pix_unc = np.require(pix_unc, dtype=c_double, requirements=requirements) ycen = np.require(ycen, dtype=c_double, requirements=requirements) model = np.zeros((nrows, ncols), dtype=c_double) unc = np.zeros(ncols, dtype=c_double) # Call the C function slitfunclib.slit_func_vert( ffi.cast("int", ncols), ffi.cast("int", nrows), ffi.cast("double *", img.ctypes.data), ffi.cast("double *", pix_unc.ctypes.data), ffi.cast("int *", mask.ctypes.data), ffi.cast("double *", ycen.ctypes.data), ffi.cast("int", osample), ffi.cast("double", lambda_sp), ffi.cast("double", lambda_sf), ffi.cast("double *", sp.ctypes.data), ffi.cast("double *", sl.ctypes.data), ffi.cast("double *", model.ctypes.data), ffi.cast("double *", unc.ctypes.data), ) mask = ~mask.astype(bool) return sp, sl, model, unc, mask
[docs]def slitfunc_curved( img, ycen, tilt, shear, lambda_sp, lambda_sf, osample, yrange, maxiter=20, gain=1 ): """Decompose an image into a spectrum and a slitfunction, image may be curved Parameters ---------- img : array[n, m] input image ycen : array[n] traces the center of the order tilt : array[n] tilt (1st order curvature) of the order along the image, set to 0 if order straight shear : array[n] shear (2nd order curvature) of the order along the image, set to 0 if order straight osample : int Subpixel ovsersampling factor (the default is 1, no oversampling) lambda_sp : float smoothing factor spectrum (the default is 0, no smoothing) lambda_sl : float smoothing factor slitfunction (the default is 0.1, small smoothing) yrange : array[2] number of pixels below and above the central line that have been cut out maxiter : int, optional maximumim number of iterations, by default 20 gain : float, optional gain of the image, by default 1 Returns ------- sp, sl, model, unc spectrum, slitfunction, model, spectrum uncertainties """ # Convert datatypes to expected values lambda_sf = float(lambda_sf) lambda_sp = float(lambda_sp) osample = int(osample) maxiter = int(maxiter) img = np.asanyarray(img, dtype=c_double) ycen = np.asarray(ycen, dtype=c_double) yrange = np.asarray(yrange, dtype=int) assert img.ndim == 2, "Image must be 2 dimensional" assert ycen.ndim == 1, "Ycen must be 1 dimensional" assert maxiter > 0, "Maximum iterations must be positive" if np.isscalar(tilt): tilt = np.full(img.shape[1], tilt, dtype=c_double) else: tilt = np.asarray(tilt, dtype=c_double) if np.isscalar(shear): shear = np.full(img.shape[1], shear, dtype=c_double) else: shear = np.asarray(shear, dtype=c_double) assert ( img.shape[1] == ycen.size ), "Image and Ycen shapes are incompatible, got {} and {}".format( img.shape, ycen.shape ) assert ( img.shape[1] == tilt.size ), "Image and Tilt shapes are incompatible, got {} and {}".format( img.shape, tilt.shape ) assert ( img.shape[1] == shear.size ), "Image and Shear shapes are incompatible, got {} and {}".format( img.shape, shear.shape, ) assert osample > 0, f"Oversample rate must be positive, but got {osample}" assert ( lambda_sf >= 0 ), f"Slitfunction smoothing must be positive, but got {lambda_sf}" assert lambda_sp >= 0, f"Spectrum smoothing must be positive, but got {lambda_sp}" # assert np.ma.all(np.isfinite(img)), "All values in the image must be finite" assert np.all(np.isfinite(ycen)), "All values in ycen must be finite" assert np.all(np.isfinite(tilt)), "All values in tilt must be finite" assert np.all(np.isfinite(shear)), "All values in shear must be finite" assert yrange.ndim == 1, "Yrange must be 1 dimensional" assert yrange.size == 2, "Yrange must have 2 elements" assert ( yrange[0] + yrange[1] + 1 == img.shape[0] ), "Yrange must cover the whole image" assert yrange[0] >= 0, "Yrange must be positive" assert yrange[1] >= 0, "Yrange must be positive" # Retrieve some derived values nrows, ncols = img.shape ny = osample * (nrows + 1) + 1 ycen_offset = ycen.astype(c_int) ycen_int = ycen - ycen_offset y_lower_lim = int(yrange[0]) mask = np.ma.getmaskarray(img) img = np.ma.getdata(img) mask2 = ~np.isfinite(img) img[mask2] = 0 mask |= ~np.isfinite(img) # sp should never be all zero (thats a horrible guess) and leads to all nans # This is a simplified run of the algorithm without oversampling or curvature # But strong smoothing # To remove the most egregious outliers, which would ruin the fit sp = np.sum(img, axis=0) median_filter(sp, 5, output=sp) sl = np.median(img, axis=1) sl /= np.sum(sl) model = sl[:, None] * sp[None, :] diff = model - img mask[np.abs(diff) > 10 * diff.std()] = True sp = np.sum(img, axis=0) mask = np.where(mask, c_int(0), c_int(1)) # Determine the shot noise # by converting electrons to photonsm via the gain pix_unc = np.nan_to_num(np.abs(img), copy=False) pix_unc *= gain np.sqrt(pix_unc, out=pix_unc) pix_unc[pix_unc < 1] = 1 psf_curve = np.zeros((ncols, 3), dtype=c_double) psf_curve[:, 1] = tilt psf_curve[:, 2] = shear # Initialize arrays and ensure the correct datatype for C requirements = ["C", "A", "W", "O"] sp = np.require(sp, dtype=c_double, requirements=requirements) mask = np.require(mask, dtype=c_mask, requirements=requirements) img = np.require(img, dtype=c_double, requirements=requirements) pix_unc = np.require(pix_unc, dtype=c_double, requirements=requirements) ycen_int = np.require(ycen_int, dtype=c_double, requirements=requirements) ycen_offset = np.require(ycen_offset, dtype=c_int, requirements=requirements) # This memory could be reused between swaths sl = np.zeros(ny, dtype=c_double) model = np.zeros((nrows, ncols), dtype=c_double) unc = np.zeros(ncols, dtype=c_double) # Info contains the folowing: sucess, cost, status, iteration, delta_x info = np.zeros(5, dtype=c_double) col = np.sum(mask, axis=0) == 0 if np.any(col): mask[mask.shape[0] // 2, col] = 1 # assert not np.any(np.sum(mask, axis=0) == 0), "At least one mask column is all 0." # Call the C function slitfunc_2dlib.slit_func_curved( ffi.cast("int", ncols), ffi.cast("int", nrows), ffi.cast("int", ny), ffi.cast("double *", img.ctypes.data), ffi.cast("double *", pix_unc.ctypes.data), ffi.cast("unsigned char *", mask.ctypes.data), ffi.cast("double *", ycen_int.ctypes.data), ffi.cast("int *", ycen_offset.ctypes.data), ffi.cast("int", y_lower_lim), ffi.cast("int", osample), ffi.cast("double", lambda_sp), ffi.cast("double", lambda_sf), ffi.cast("int", maxiter), ffi.cast("double *", psf_curve.ctypes.data), ffi.cast("double *", sp.ctypes.data), ffi.cast("double *", sl.ctypes.data), ffi.cast("double *", model.ctypes.data), ffi.cast("double *", unc.ctypes.data), ffi.cast("double *", info.ctypes.data), ) if np.any(np.isnan(sp)): logger.error("NaNs in the spectrum") # The decomposition failed if info[0] == 0: status = info[2] if status == 0: msg = "I dont't know what happened" elif status == -1: msg = f"Did not finish convergence after maxiter ({maxiter}) iterations" elif status == -2: msg = "Curvature is larger than the swath. Check the curvature!" else: msg = f"Check the C code, for status = {status}" logger.error(msg) # raise RuntimeError(msg) mask = mask == 0 return sp, sl, model, unc, mask, info