Source code for pyreduce.cwrappers

"""
Wrapper for the slit-decomposition C function.

This module provides access to the slitdec extraction algorithm in the
CFFI-compiled C library (clib/slitdec.c, copied from charslit) and sanitizes
the input parameters.

Mask convention here is charslit's: 0 = bad pixel, 1 = good pixel. No
conversion is done; callers pass and receive masks in that convention.
"""

import ctypes
import logging

import numpy as np

logger = logging.getLogger(__name__)

from .clib._slitdec import ffi as ffi_slitdec
from .clib._slitdec import lib as slitdeclib

c_double = ctypes.c_double
c_int = ctypes.c_int
c_mask = ctypes.c_ubyte


[docs] def slitdec( im, pix_unc, mask, ycen, slitcurve, slitdeltas, osample=6, lambda_sP=0.0, lambda_sL=1.0, maxiter=20, kappa=10.0, preset_slitfunc=None, ): """Slit decomposition with slit characterization (charslit algorithm). Plain-C port of charslit's ``slitdec``, compiled via CFFI. The signature and returned dict match the nanobind ``charslit.slitdec`` so this is a drop-in replacement for the external package. Mask convention here is charslit's (0 = bad, 1 = good); no conversion is done. When ``preset_slitfunc`` is given (length ``ny`` used directly, or ``nrows`` interpolated up), the slit-function solve is skipped and only the spectrum is fit against it (single-pass extraction); the preset is normalized to sum ``osample`` inside the C code. Returns ------- dict with keys spectrum, slitfunction, model, uncertainty, info, mask, return_code. """ requirements = ["C", "A", "W", "O"] im = np.require(im, dtype=c_double, requirements=requirements) pix_unc = np.require(pix_unc, dtype=c_double, requirements=requirements) nrows, ncols = im.shape if pix_unc.shape != (nrows, ncols): raise ValueError("pix_unc must have same shape as im") if mask.shape != (nrows, ncols): raise ValueError("mask must have same shape as im") if ycen.shape[0] != ncols: raise ValueError("ycen must have length ncols") slitcurve = np.asarray(slitcurve, dtype=c_double) n_coeffs = slitcurve.shape[1] if slitcurve.shape[0] != ncols or not (1 <= n_coeffs <= 6): raise ValueError("slitcurve must have shape (ncols, n) where 1 <= n <= 6") osample = int(osample) ny = osample * (nrows + 1) + 1 # slitdeltas may be given per-row (nrows) or per-subpixel (ny). Per-row is # linearly interpolated up to ny, matching the charslit wrapper. slitdeltas = np.asarray(slitdeltas, dtype=c_double).ravel() if slitdeltas.size == nrows: pos = np.arange(ny) * (nrows - 1.0) / (ny - 1.0) slitdeltas_ny = np.interp(pos, np.arange(nrows), slitdeltas) elif slitdeltas.size == ny: slitdeltas_ny = slitdeltas.copy() else: raise ValueError( "slitdeltas must have length nrows or ny = osample * (nrows + 1) + 1" ) slitdeltas_ny = np.require(slitdeltas_ny, dtype=c_double, requirements=requirements) # Pad slitcurve to 6 coefficients per column (C code uses stride 6) slitcurve_padded = np.zeros((ncols, 6), dtype=c_double) slitcurve_padded[:, :n_coeffs] = slitcurve slitcurve_padded = np.require( slitcurve_padded, dtype=c_double, requirements=requirements ) # The algorithm modifies mask and ycen in place, so pass copies. mask_copy = np.require(mask, dtype=c_mask, requirements=requirements).copy() ycen_copy = np.require(ycen, dtype=c_double, requirements=requirements).copy() # Seed sL. With a preset slit function we copy it in and tell the C code to # skip the sL solve (single-pass extraction); the preset may be length ny # (used directly) or nrows (interpolated up, mirroring slitdeltas). The C # code normalizes it to sum osample. Otherwise seed the flat 1/osample guess. use_preset = 0 if preset_slitfunc is not None: use_preset = 1 ps = np.asarray(preset_slitfunc, dtype=c_double).ravel() if ps.size == ny: sL = ps.copy() elif ps.size == nrows: pos = np.arange(ny) * (nrows - 1.0) / (ny - 1.0) sL = np.interp(pos, np.arange(nrows), ps) else: raise ValueError( "preset_slitfunc must have length nrows or ny = osample * (nrows + 1) + 1" ) sL = np.require(sL, dtype=c_double, requirements=requirements) else: sL = np.full(ny, 1.0 / osample, dtype=c_double) # Output arrays, with the same starting guesses as the charslit wrapper. sP = np.ones(ncols, dtype=c_double) model = np.zeros((nrows, ncols), dtype=c_double) unc = np.zeros(ncols, dtype=c_double) info = np.zeros(5, dtype=c_double) return_code = slitdeclib.slitdec( ffi_slitdec.cast("int", ncols), ffi_slitdec.cast("int", nrows), ffi_slitdec.cast("double *", im.ctypes.data), ffi_slitdec.cast("double *", pix_unc.ctypes.data), ffi_slitdec.cast("unsigned char *", mask_copy.ctypes.data), ffi_slitdec.cast("double *", ycen_copy.ctypes.data), ffi_slitdec.cast("double *", slitcurve_padded.ctypes.data), ffi_slitdec.cast("double *", slitdeltas_ny.ctypes.data), ffi_slitdec.cast("int", osample), ffi_slitdec.cast("double", float(lambda_sP)), ffi_slitdec.cast("double", float(lambda_sL)), ffi_slitdec.cast("int", int(maxiter)), ffi_slitdec.cast("double", float(kappa)), ffi_slitdec.cast("int", use_preset), ffi_slitdec.cast("double *", sP.ctypes.data), ffi_slitdec.cast("double *", sL.ctypes.data), ffi_slitdec.cast("double *", model.ctypes.data), ffi_slitdec.cast("double *", unc.ctypes.data), ffi_slitdec.cast("double *", info.ctypes.data), ) return { "spectrum": sP, "slitfunction": sL, "model": model, "uncertainty": unc, "info": info, "mask": mask_copy, "return_code": int(return_code), }