# -*- coding: utf-8 -*-
"""
REDUCE script for spectrograph data
Authors
-------
Ansgar Wehrhahn (ansgar.wehrhahn@physics.uu.se)
Thomas Marquart (thomas.marquart@physics.uu.se)
Alexis Lavail (alexis.lavail@physics.uu.se)
Nikolai Piskunov (nikolai.piskunov@physics.uu.se)
Version
-------
1.0 - Initial PyReduce
License
--------
...
"""
import glob
import logging
import os.path
from itertools import product
from os.path import dirname, join
import joblib
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from genericpath import exists
from tqdm import tqdm
from pyreduce import wavelength_calibration
# PyReduce subpackages
from . import __version__, echelle, instruments, util
from .combine_frames import (
combine_bias,
combine_calibrate,
combine_frames,
combine_polynomial,
)
from .configuration import load_config
from .continuum_normalization import continuum_normalize, splice_orders
from .estimate_background_scatter import estimate_background_scatter
from .extract import extract
from .extraction_width import estimate_extraction_width
from .instruments.instrument_info import load_instrument
from .make_shear import Curvature as CurvatureModule
from .rectify import merge_images, rectify_image
from .trace_orders import mark_orders
from .wavelength_calibration import LineList
from .wavelength_calibration import WavelengthCalibration as WavelengthCalibrationModule
from .wavelength_calibration import WavelengthCalibrationComb
from .wavelength_calibration import (
WavelengthCalibrationInitialize as WavelengthCalibrationInitializeModule,
)
# TODO Naming of functions and modules
# TODO License
# TODO automatic determination of the extraction width
logger = logging.getLogger(__name__)
[docs]def main(
instrument,
target,
night=None,
modes=None,
steps="all",
base_dir=None,
input_dir=None,
output_dir=None,
configuration=None,
order_range=None,
allow_calibration_only=False,
skip_existing=False,
):
r"""
Main entry point for REDUCE scripts,
default values can be changed as required if reduce is used as a script
Finds input directories, and loops over observation nights and instrument modes
Parameters
----------
instrument : str, list[str]
instrument used for the observation (e.g. UVES, HARPS)
target : str, list[str]
the observed star, as named in the folder structure/fits headers
night : str, list[str]
the observation nights to reduce, as named in the folder structure. Accepts bash wildcards (i.e. \*, ?), but then relies on the folder structure for restricting the nights
modes : str, list[str], dict[{instrument}:list], None, optional
the instrument modes to use, if None will use all known modes for the current instrument. See instruments for possible options
steps : tuple(str), "all", optional
which steps of the reduction process to perform
the possible steps are: "bias", "flat", "orders", "norm_flat", "wavecal", "science"
alternatively set steps to "all", which is equivalent to setting all steps
Note that the later steps require the previous intermediary products to exist and raise an exception otherwise
base_dir : str, optional
base data directory that Reduce should work in, is prefixxed on input_dir and output_dir (default: use settings_pyreduce.json)
input_dir : str, optional
input directory containing raw files. Can contain placeholders {instrument}, {target}, {night}, {mode} as well as wildcards. If relative will use base_dir as root (default: use settings_pyreduce.json)
output_dir : str, optional
output directory for intermediary and final results. Can contain placeholders {instrument}, {target}, {night}, {mode}, but no wildcards. If relative will use base_dir as root (default: use settings_pyreduce.json)
configuration : dict[str:obj], str, list[str], dict[{instrument}:dict,str], optional
configuration file for the current run, contains parameters for different parts of reduce. Can be a path to a json file, or a dict with configurations for the different instruments. When a list, the order must be the same as instruments (default: settings_{instrument.upper()}.json)
"""
if target is None or np.isscalar(target):
target = [target]
if night is None or np.isscalar(night):
night = [night]
isNone = {
"modes": modes is None,
"base_dir": base_dir is None,
"input_dir": input_dir is None,
"output_dir": output_dir is None,
}
output = []
# Loop over everything
# settings: default settings of PyReduce
# config: paramters for the current reduction
# info: constant, instrument specific parameters
config = load_config(configuration, instrument, 0)
if isinstance(instrument, str):
instrument = instruments.instrument_info.load_instrument(instrument)
info = instrument.info
# load default settings from settings_pyreduce.json
if base_dir is None:
base_dir = config["reduce"]["base_dir"]
if input_dir is None:
input_dir = config["reduce"]["input_dir"]
if output_dir is None:
output_dir = config["reduce"]["output_dir"]
input_dir = join(base_dir, input_dir)
output_dir = join(base_dir, output_dir)
if modes is None:
modes = info["modes"]
if np.isscalar(modes):
modes = [modes]
for t, n, m in product(target, night, modes):
log_file = join(
base_dir.format(instrument=str(instrument), mode=modes, target=t),
"logs/%s.log" % t,
)
util.start_logging(log_file)
# find input files and sort them by type
files = instrument.sort_files(
input_dir,
t,
n,
mode=m,
**config["instrument"],
allow_calibration_only=allow_calibration_only,
)
if len(files) == 0:
logger.warning(
f"No files found for instrument: %s, target: %s, night: %s, mode: %s in folder: %s",
instrument,
t,
n,
m,
input_dir,
)
continue
for k, f in files:
logger.info("Settings:")
for key, value in k.items():
logger.info("%s: %s", key, value)
logger.debug("Files:\n%s", f)
reducer = Reducer(
f,
output_dir,
k.get("target"),
instrument,
m,
k.get("night"),
config,
order_range=order_range,
skip_existing=skip_existing,
)
# try:
data = reducer.run_steps(steps=steps)
output.append(data)
# except Exception as e:
# logger.error("Reduction failed with error message: %s", str(e))
# logger.info("------------")
return output
[docs]class Step:
"""Parent class for all steps"""
def __init__(
self, instrument, mode, target, night, output_dir, order_range, **config
):
self._dependsOn = []
self._loadDependsOn = []
#:str: Name of the instrument
self.instrument = instrument
#:str: Name of the instrument mode
self.mode = mode
#:str: Name of the observation target
self.target = target
#:str: Date of the observation (as a string)
self.night = night
#:tuple(int, int): First and Last(+1) order to process
self.order_range = order_range
#:bool: Whether to plot the results or the progress of this step
self.plot = config.get("plot", False)
#:str: Title used in the plots, if any
self.plot_title = config.get("plot_title", None)
self._output_dir = output_dir
[docs] def run(self, files, *args): # pragma: no cover
"""Execute the current step
This should fail if files are missing or anything else goes wrong.
If the user does not want to run this step, they should not specify it in steps.
Parameters
----------
files : list(str)
data files required for this step
Raises
------
NotImplementedError
needs to be implemented for each step
"""
raise NotImplementedError
[docs] def save(self, *args): # pragma: no cover
"""Save the results of this step
Parameters
----------
*args : obj
things to save
Raises
------
NotImplementedError
Needs to be implemented for each step
"""
raise NotImplementedError
[docs] def load(self): # pragma: no cover
"""Load results from a previous execution
If this raises a FileNotFoundError, run() will be used instead
For calibration steps it is preferred however to print a warning
and return None. Other modules can then use a default value instead.
Raises
------
NotImplementedError
Needs to be implemented for each step
"""
raise NotImplementedError
@property
def dependsOn(self):
"""list(str): Steps that are required before running this step"""
return list(set(self._dependsOn))
@property
def loadDependsOn(self):
"""list(str): Steps that are required before loading data from this step"""
return list(set(self._loadDependsOn))
@property
def output_dir(self):
"""str: output directory, may contain tags {instrument}, {night}, {target}, {mode}"""
return self._output_dir.format(
instrument=self.instrument.name.upper(),
target=self.target,
night=self.night,
mode=self.mode,
)
@property
def prefix(self):
"""str: temporary file prefix"""
i = self.instrument.name.lower()
if self.mode is not None and self.mode != "":
m = self.mode.lower()
return f"{i}_{m}"
else:
return i
[docs]class CalibrationStep(Step):
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["mask", "bias"]
#:{'number_of_files', 'exposure_time', 'mean', 'median', 'none'}: how to adjust for diferences between the bias and flat field exposure times
self.bias_scaling = config["bias_scaling"]
#:{'divide', 'none'}: how to apply the normalized flat field
self.norm_scaling = config["norm_scaling"]
[docs] def calibrate(self, files, mask, bias=None, norm_flat=None):
bias, bhead = bias if bias is not None else (None, None)
norm, blaze = norm_flat if norm_flat is not None else (None, None)
orig, thead = combine_calibrate(
files,
self.instrument,
self.mode,
mask,
bias=bias,
bhead=bhead,
norm=norm,
bias_scaling=self.bias_scaling,
norm_scaling=self.norm_scaling,
plot=self.plot,
plot_title=self.plot_title,
)
return orig, thead
[docs]class FitsIOStep(Step):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._loadDependsOn += ["mask"]
self.allow_failure = True
[docs] def save(self, data, head, dtype=None):
"""
Save the data to a FITS file
Parameters
----------
data : array of shape (nrow, ncol)
bias data
head : FITS header
bias header
"""
if dtype is not None:
data = np.asarray(data, dtype=np.float32)
fits.writeto(
self.savefile,
data=data,
header=head,
overwrite=True,
output_verify="silentfix+ignore",
)
logger.info("Created data file: %s", self.savefile)
[docs] def load(self, mask):
"""
Load the master bias from a previous run
Parameters
----------
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
data : masked array of shape (nrow, ncol)
master bias data, with the bad pixel mask applied
head : FITS header
header of the master bias
"""
try:
data = fits.open(self.savefile)[0]
data, head = data.data, data.header
data = np.ma.masked_array(data, mask=mask)
logger.info("Data file: %s", self.savefile)
except FileNotFoundError as ex:
if self.allow_failure:
logger.warning("No data file found")
data, head = None, None
else:
raise ex
return data, head
[docs]class Mask(Step):
"""Load the bad pixel mask for the given instrument/mode"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
[docs] def run(self):
"""Load the mask file from disk
Returns
-------
mask : array of shape (nrow, ncol)
Bad pixel mask for this setting
"""
return self.load()
[docs] def load(self):
"""Load the mask file from disk
Returns
-------
mask : array of shape (nrow, ncol)
Bad pixel mask for this setting
"""
mask_file = self.instrument.get_mask_filename(mode=self.mode)
try:
mask, _ = self.instrument.load_fits(mask_file, self.mode, extension=0)
mask = ~mask.data.astype(bool) # REDUCE mask are inverse to numpy masks
logger.info("Bad pixel mask file: %s", mask_file)
except (FileNotFoundError, ValueError):
logger.error(
"Bad Pixel Mask datafile %s not found. Using all pixels instead.",
mask_file,
)
mask = False
return mask
[docs]class Bias(Step):
"""Calculates the master bias"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["mask"]
self._loadDependsOn += ["mask"]
#:int: polynomial degree of the fit between exposure time and pixel values
self.degree = config["degree"]
@property
def savefile(self):
"""str: Name of master bias fits file"""
return join(self.output_dir, self.prefix + ".bias.fits")
[docs] def run(self, files, mask):
"""Calculate the master bias
Parameters
----------
files : list(str)
bias files
mask : array of shape (nrow, ncol)
bad pixel map
Returns
-------
bias : masked array of shape (nrow, ncol)
master bias data, with the bad pixel mask applied
bhead : FITS header
header of the master bias
"""
logger.info("Bias Files: %s", files)
if self.degree == 0:
# If the degree is 0, we just combine all images into a single master bias
# this works great if we assume there is no dark at exposure time 0
bias, bhead = combine_bias(
files,
self.instrument,
self.mode,
mask=mask,
plot=self.plot,
plot_title=self.plot_title,
)
else:
# Otherwise we fit a polynomial to each pixel in the image, with
# the pixel value versus the exposure time. The constant coefficients
# are then the bias, and the others are used to scale with the
# exposure time
bias, bhead = combine_polynomial(
files,
self.instrument,
self.mode,
mask=mask,
degree=self.degree,
plot=self.plot,
plot_title=self.plot_title,
)
self.save(bias.data, bhead)
return bias, bhead
[docs] def save(self, bias, bhead):
"""Save the master bias to a FITS file
Parameters
----------
bias : array of shape (nrow, ncol)
bias data
bhead : FITS header
bias header
"""
bias = np.asarray(bias, dtype=np.float32)
if self.degree == 0:
hdus = fits.PrimaryHDU(data=bias, header=bhead)
else:
hdus = [fits.PrimaryHDU(data=bias[0], header=bhead)]
for i in range(1, len(bias)):
hdus += [fits.ImageHDU(data=bias[i])]
hdus = fits.HDUList(hdus)
hdus.writeto(
self.savefile,
overwrite=True,
output_verify="silentfix+ignore",
)
logger.info("Created master bias file: %s", self.savefile)
[docs] def load(self, mask):
"""Load the master bias from a previous run
Parameters
----------
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
bias : masked array of shape (nrow, ncol)
master bias data, with the bad pixel mask applied
bhead : FITS header
header of the master bias
"""
try:
logger.info("Master bias file: %s", self.savefile)
hdu = fits.open(self.savefile)
degree = len(hdu) - 1
if degree == 0:
bias, bhead = hdu[0].data, hdu[0].header
bias = np.ma.masked_array(bias, mask=mask)
else:
bhead = hdu[0].header
bias = np.array([h.data for h in hdu])
bias = np.ma.masked_array(bias, mask=[mask for _ in range(len(hdu))])
except FileNotFoundError:
logger.warning("No intermediate bias file found. Using Bias = 0 instead.")
bias, bhead = None, None
return bias, bhead
[docs]class Flat(CalibrationStep):
"""Calculates the master flat"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._loadDependsOn += ["mask"]
@property
def savefile(self):
"""str: Name of master bias fits file"""
return join(self.output_dir, self.prefix + ".flat.fits")
[docs] def save(self, flat, fhead):
"""Save the master flat to a FITS file
Parameters
----------
flat : array of shape (nrow, ncol)
master flat data
fhead : FITS header
master flat header
"""
flat = np.asarray(flat, dtype=np.float32)
fits.writeto(
self.savefile,
data=flat,
header=fhead,
overwrite=True,
output_verify="silentfix+ignore",
)
logger.info("Created master flat file: %s", self.savefile)
[docs] def run(self, files, bias, mask):
"""Calculate the master flat, with the bias already subtracted
Parameters
----------
files : list(str)
flat files
bias : tuple(array of shape (nrow, ncol), FITS header)
master bias and header
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
flat : masked array of shape (nrow, ncol)
Master flat with bad pixel map applied
fhead : FITS header
Master flat FITS header
"""
logger.info("Flat files: %s", files)
# This is just the calibration of images
flat, fhead = self.calibrate(files, mask, bias, None)
# And then save it
self.save(flat.data, fhead)
return flat, fhead
[docs] def load(self, mask):
"""Load master flat from disk
Parameters
----------
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
flat : masked array of shape (nrow, ncol)
Master flat with bad pixel map applied
fhead : FITS header
Master flat FITS header
"""
try:
flat = fits.open(self.savefile)[0]
flat, fhead = flat.data, flat.header
flat = np.ma.masked_array(flat, mask=mask)
logger.info("Master flat file: %s", self.savefile)
except FileNotFoundError:
logger.warning(
"No intermediate file for the flat field found. Using Flat = 1 instead"
)
flat, fhead = None, None
return flat, fhead
[docs]class OrderTracing(CalibrationStep):
"""Determine the polynomial fits describing the pixel locations of each order"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
#:int: Minimum size of each cluster to be included in further processing
self.min_cluster = config["min_cluster"]
#:int, float: Minimum width of each cluster after mergin
self.min_width = config["min_width"]
#:int: Size of the gaussian filter for smoothing
self.filter_size = config["filter_size"]
#:int: Background noise value threshold
self.noise = config["noise"]
#:int: Polynomial degree of the fit to each order
self.fit_degree = config["degree"]
self.degree_before_merge = config["degree_before_merge"]
self.regularization = config["regularization"]
self.closing_shape = config["closing_shape"]
self.auto_merge_threshold = config["auto_merge_threshold"]
self.merge_min_threshold = config["merge_min_threshold"]
self.sigma = config["split_sigma"]
#:int: Number of pixels at the edge of the detector to ignore
self.border_width = config["border_width"]
#:bool: Whether to use manual alignment
self.manual = config["manual"]
@property
def savefile(self):
"""str: Name of the order tracing file"""
return join(self.output_dir, self.prefix + ".ord_default.npz")
[docs] def run(self, files, mask, bias):
"""Determine polynomial coefficients describing order locations
Parameters
----------
files : list(str)
Observation used for order tracing (should only have one element)
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
orders : array of shape (nord, ndegree+1)
polynomial coefficients for each order
column_range : array of shape (nord, 2)
first and last(+1) column that carries signal in each order
"""
logger.info("Order tracing files: %s", files)
order_img, ohead = self.calibrate(files, mask, bias, None)
orders, column_range = mark_orders(
order_img,
min_cluster=self.min_cluster,
min_width=self.min_width,
filter_size=self.filter_size,
noise=self.noise,
opower=self.fit_degree,
degree_before_merge=self.degree_before_merge,
regularization=self.regularization,
closing_shape=self.closing_shape,
border_width=self.border_width,
manual=self.manual,
auto_merge_threshold=self.auto_merge_threshold,
merge_min_threshold=self.merge_min_threshold,
sigma=self.sigma,
plot=self.plot,
plot_title=self.plot_title,
)
self.save(orders, column_range)
return orders, column_range
[docs] def save(self, orders, column_range):
"""Save order tracing results to disk
Parameters
----------
orders : array of shape (nord, ndegree+1)
polynomial coefficients
column_range : array of shape (nord, 2)
first and last(+1) column that carry signal in each order
"""
np.savez(self.savefile, orders=orders, column_range=column_range)
logger.info("Created order tracing file: %s", self.savefile)
[docs] def load(self):
"""Load order tracing results
Returns
-------
orders : array of shape (nord, ndegree+1)
polynomial coefficients for each order
column_range : array of shape (nord, 2)
first and last(+1) column that carries signal in each order
"""
logger.info("Order tracing file: %s", self.savefile)
data = np.load(self.savefile, allow_pickle=True)
orders = data["orders"]
column_range = data["column_range"]
return orders, column_range
[docs]class BackgroundScatter(CalibrationStep):
"""Determine the background scatter"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["orders"]
#:tuple(int, int): Polynomial degrees for the background scatter fit, in row, column direction
self.scatter_degree = config["scatter_degree"]
self.extraction_width = config["extraction_width"]
self.sigma_cutoff = config["scatter_cutoff"]
self.border_width = config["border_width"]
@property
def savefile(self):
"""str: Name of the scatter file"""
return join(self.output_dir, self.prefix + ".scatter.npz")
[docs] def run(self, files, mask, bias, orders):
logger.info("Background scatter files: %s", files)
scatter_img, shead = self.calibrate(files, mask, bias)
orders, column_range = orders
scatter = estimate_background_scatter(
scatter_img,
orders,
column_range=column_range,
extraction_width=self.extraction_width,
scatter_degree=self.scatter_degree,
sigma_cutoff=self.sigma_cutoff,
border_width=self.border_width,
plot=self.plot,
plot_title=self.plot_title,
)
self.save(scatter)
return scatter
[docs] def save(self, scatter):
"""Save scatter results to disk
Parameters
----------
scatter : array
scatter coefficients
"""
np.savez(self.savefile, scatter=scatter)
logger.info("Created background scatter file: %s", self.savefile)
[docs] def load(self):
"""Load scatter results from disk
Returns
-------
scatter : array
scatter coefficients
"""
try:
data = np.load(self.savefile, allow_pickle=True)
logger.info("Background scatter file: %s", self.savefile)
except FileNotFoundError:
logger.warning(
"No intermediate files found for the scatter. Using scatter = 0 instead."
)
data = {"scatter": None}
scatter = data["scatter"]
return scatter
[docs]class NormalizeFlatField(Step):
"""Calculate the 'normalized' flat field image"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["flat", "orders", "scatter", "curvature"]
#:{'normalize'}: Extraction method to use
self.extraction_method = config["extraction_method"]
if self.extraction_method == "normalize":
#:dict: arguments for the extraction
self.extraction_kwargs = {
"extraction_width": config["extraction_width"],
"lambda_sf": config["smooth_slitfunction"],
"lambda_sp": config["smooth_spectrum"],
"osample": config["oversampling"],
"swath_width": config["swath_width"],
"sigma_cutoff": config["extraction_cutoff"],
"maxiter": config["maxiter"],
}
else:
raise ValueError(
f"Extraction method {self.extraction_method} not supported for step 'norm_flat'"
)
#:int: Threshold of the normalized flat field (values below this are just 1)
self.threshold = config["threshold"]
self.threshold_lower = config["threshold_lower"]
@property
def savefile(self):
"""str: Name of the blaze file"""
return join(self.output_dir, self.prefix + ".flat_norm.npz")
[docs] def run(self, flat, orders, scatter, curvature):
"""Calculate the 'normalized' flat field
Parameters
----------
flat : tuple(array, header)
Master flat, and its FITS header
orders : tuple(array, array)
Polynomial coefficients for each order, and the first and last(+1) column containing signal
Returns
-------
norm : array of shape (nrow, ncol)
normalized flat field
blaze : array of shape (nord, ncol)
Continuum level as determined from the flat field for each order
"""
flat, fhead = flat
orders, column_range = orders
tilt, shear = curvature
# if threshold is smaller than 1, assume percentage value is given
if self.threshold <= 1:
threshold = np.percentile(flat, self.threshold * 100)
else:
threshold = self.threshold
norm, _, blaze, _ = extract(
flat,
orders,
gain=fhead["e_gain"],
readnoise=fhead["e_readn"],
dark=fhead["e_drk"],
order_range=self.order_range,
column_range=column_range,
scatter=scatter,
threshold=threshold,
threshold_lower=self.threshold_lower,
extraction_type=self.extraction_method,
tilt=tilt,
shear=shear,
plot=self.plot,
plot_title=self.plot_title,
**self.extraction_kwargs,
)
blaze = np.ma.filled(blaze, 0)
norm = np.nan_to_num(norm, nan=1)
self.save(norm, blaze)
return norm, blaze
[docs] def save(self, norm, blaze):
"""Save normalized flat field results to disk
Parameters
----------
norm : array of shape (nrow, ncol)
normalized flat field
blaze : array of shape (nord, ncol)
Continuum level as determined from the flat field for each order
"""
np.savez(self.savefile, blaze=blaze, norm=norm)
logger.info("Created normalized flat file: %s", self.savefile)
[docs] def load(self):
"""Load normalized flat field results from disk
Returns
-------
norm : array of shape (nrow, ncol)
normalized flat field
blaze : array of shape (nord, ncol)
Continuum level as determined from the flat field for each order
"""
try:
data = np.load(self.savefile, allow_pickle=True)
logger.info("Normalized flat file: %s", self.savefile)
except FileNotFoundError:
logger.warning(
"No intermediate files found for the normalized flat field. Using flat = 1 instead."
)
data = {"blaze": None, "norm": None}
blaze = data["blaze"]
norm = data["norm"]
return norm, blaze
[docs]class WavelengthCalibrationMaster(CalibrationStep, ExtractionStep):
"""Create wavelength calibration master image"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["norm_flat", "curvature"]
@property
def savefile(self):
"""str: Name of the wavelength echelle file"""
return join(self.output_dir, self.prefix + ".thar_master.fits")
[docs] def run(self, files, orders, mask, curvature, bias, norm_flat):
"""Perform wavelength calibration
This consists of extracting the wavelength image
and fitting a polynomial the the known spectral lines
Parameters
----------
files : list(str)
wavelength calibration files
orders : tuple(array, array)
Polynomial coefficients of each order, and columns with signal of each order
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
wave : array of shape (nord, ncol)
wavelength for each point in the spectrum
thar : array of shape (nrow, ncol)
extracted wavelength calibration image
coef : array of shape (*ndegrees,)
polynomial coefficients of the wavelength fit
linelist : record array of shape (nlines,)
Updated line information for all lines
"""
if len(files) == 0:
raise FileNotFoundError("No files found for wavelength calibration")
logger.info("Wavelength calibration files: %s", files)
# Load wavecal image
orig, thead = self.calibrate(files, mask, bias, norm_flat)
# Extract wavecal spectrum
thar, _, _, _ = self.extract(orig, thead, orders, curvature)
self.save(thar, thead)
return thar, thead
[docs] def save(self, thar, thead):
"""Save the master wavelength calibration to a FITS file
Parameters
----------
thar : array of shape (nrow, ncol)
master flat data
thead : FITS header
master flat header
"""
thar = np.asarray(thar, dtype=np.float64)
fits.writeto(
self.savefile,
data=thar,
header=thead,
overwrite=True,
output_verify="silentfix+ignore",
)
logger.info("Created wavelength calibration spectrum file: %s", self.savefile)
[docs] def load(self):
"""Load master wavelength calibration from disk
Returns
-------
thar : masked array of shape (nrow, ncol)
Master wavecal with bad pixel map applied
thead : FITS header
Master wavecal FITS header
"""
thar = fits.open(self.savefile)[0]
thar, thead = thar.data, thar.header
logger.info("Wavelength calibration spectrum file: %s", self.savefile)
return thar, thead
[docs]class WavelengthCalibrationInitialize(Step):
"""Create the initial wavelength solution file"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["wavecal_master"]
self._loadDependsOn += ["config", "wavecal_master"]
#:tuple(int, int): Polynomial degree of the wavelength calibration in order, column direction
self.degree = config["degree"]
#:float: wavelength range around the initial guess to explore
self.wave_delta = config["wave_delta"]
#:int: number of walkers in the MCMC
self.nwalkers = config["nwalkers"]
#:int: number of steps in the MCMC
self.steps = config["steps"]
#:float: resiudal range to accept as match between peaks and atlas in m/s
self.resid_delta = config["resid_delta"]
#:str: element for the atlas to use
self.element = config["element"]
#:str: medium the medium of the instrument, air or vac
self.medium = config["medium"]
#:float: Gaussian smoothing parameter applied to the observed spectrum in pixel scale, set to 0 to disable smoothing
self.smoothing = config["smoothing"]
#:float: Minimum height of spectral lines in the normalized spectrum, values of 1 and above are interpreted as percentiles of the spectrum, set to 0 to disable the cutoff
self.cutoff = config["cutoff"]
@property
def savefile(self):
"""str: Name of the wavelength echelle file"""
return join(self.output_dir, self.prefix + ".linelist.npz")
[docs] def run(self, wavecal_master):
thar, thead = wavecal_master
# Get the initial wavelength guess from the instrument
wave_range = self.instrument.get_wavelength_range(thead, self.mode)
if wave_range is None:
raise ValueError(
"This instrument is missing an initial wavelength guess for wavecal_init"
)
module = WavelengthCalibrationInitializeModule(
plot=self.plot,
plot_title=self.plot_title,
degree=self.degree,
wave_delta=self.wave_delta,
nwalkers=self.nwalkers,
steps=self.steps,
resid_delta=self.resid_delta,
element=self.element,
medium=self.medium,
smoothing=self.smoothing,
cutoff=self.cutoff,
)
linelist = module.execute(thar, wave_range)
self.save(linelist)
return linelist
[docs] def save(self, linelist):
linelist.save(self.savefile)
logger.info("Created wavelength calibration linelist file: %s", self.savefile)
[docs] def load(self, config, wavecal_master):
thar, thead = wavecal_master
try:
# Try loading the custom reference file
reference = self.savefile
linelist = LineList.load(reference)
except FileNotFoundError:
# If that fails, load the file provided by PyReduce
# It usually fails because we want to use this one
reference = self.instrument.get_wavecal_filename(
thead, self.mode, **config["instrument"]
)
# This should fail if there is no provided file by PyReduce
linelist = LineList.load(reference)
logger.info("Wavelength calibration linelist file: %s", reference)
return linelist
[docs]class WavelengthCalibrationFinalize(Step):
"""Perform wavelength calibration"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["wavecal_master", "wavecal_init"]
#:tuple(int, int): Polynomial degree of the wavelength calibration in order, column direction
self.degree = config["degree"]
#:bool: Whether to use manual alignment instead of cross correlation
self.manual = config["manual"]
#:float: residual threshold in m/s
self.threshold = config["threshold"]
#:int: Number of iterations in the remove lines, auto id cycle
self.iterations = config["iterations"]
#:{'1D', '2D'}: Whether to use 1d or 2d polynomials
self.dimensionality = config["dimensionality"]
#:int: Number of detector offset steps, due to detector design
self.nstep = config["nstep"]
#:float: fraction of columns, to allow individual orders to shift
self.shift_window = config["shift_window"]
#:str: elements of the spectral lamp
self.element = config["element"]
#:str: medium of the detector, vac or air
self.medium = config["medium"]
@property
def savefile(self):
"""str: Name of the wavelength echelle file"""
return join(self.output_dir, self.prefix + ".thar.npz")
[docs] def run(self, wavecal_master, wavecal_init):
"""Perform wavelength calibration
This consists of extracting the wavelength image
and fitting a polynomial the the known spectral lines
Parameters
----------
wavecal_master : tuple
results of the wavecal_master step, containing the master wavecal image
and its header
wavecal_init : LineList
the initial LineList guess with the positions and wavelengths of lines
Returns
-------
wave : array of shape (nord, ncol)
wavelength for each point in the spectrum
coef : array of shape (*ndegrees,)
polynomial coefficients of the wavelength fit
linelist : record array of shape (nlines,)
Updated line information for all lines
"""
thar, thead = wavecal_master
linelist = wavecal_init
module = WavelengthCalibrationModule(
plot=self.plot,
plot_title=self.plot_title,
manual=self.manual,
degree=self.degree,
threshold=self.threshold,
iterations=self.iterations,
dimensionality=self.dimensionality,
nstep=self.nstep,
shift_window=self.shift_window,
element=self.element,
medium=self.medium,
)
wave, coef = module.execute(thar, linelist)
self.save(wave, coef, linelist)
return wave, coef, linelist
[docs] def save(self, wave, coef, linelist):
"""Save the results of the wavelength calibration
Parameters
----------
wave : array of shape (nord, ncol)
wavelength for each point in the spectrum
coef : array of shape (ndegrees,)
polynomial coefficients of the wavelength fit
linelist : record array of shape (nlines,)
Updated line information for all lines
"""
np.savez(self.savefile, wave=wave, coef=coef, linelist=linelist)
logger.info("Created wavelength calibration file: %s", self.savefile)
[docs] def load(self):
"""Load the results of the wavelength calibration
Returns
-------
wave : array of shape (nord, ncol)
wavelength for each point in the spectrum
coef : array of shape (*ndegrees,)
polynomial coefficients of the wavelength fit
linelist : record array of shape (nlines,)
Updated line information for all lines
"""
data = np.load(self.savefile, allow_pickle=True)
logger.info("Wavelength calibration file: %s", self.savefile)
wave = data["wave"]
coef = data["coef"]
linelist = data["linelist"]
return wave, coef, linelist
[docs]class LaserFrequencyCombMaster(CalibrationStep, ExtractionStep):
"""Create a laser frequency comb (or similar) master image"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["norm_flat", "curvature"]
@property
def savefile(self):
"""str: Name of the wavelength echelle file"""
return join(self.output_dir, self.prefix + ".comb_master.fits")
[docs] def run(self, files, orders, mask, curvature, bias, norm_flat):
"""Improve the wavelength calibration with a laser frequency comb (or similar)
Parameters
----------
files : list(str)
observation files
orders : tuple
results from the order tracing step
mask : array of shape (nrow, ncol)
Bad pixel mask
curvature : tuple
results from the curvature step
bias : tuple
results from the bias step
Returns
-------
comb : array of shape (nord, ncol)
extracted frequency comb image
chead : Header
FITS header of the combined image
"""
if len(files) == 0:
raise FileNotFoundError("No files for Laser Frequency Comb found")
logger.info("Frequency comb files: %s", files)
# Combine the input files and calibrate
orig, chead = self.calibrate(files, mask, bias, norm_flat)
# Extract the spectrum
comb, _, _, _ = self.extract(orig, chead, orders, curvature)
self.save(comb, chead)
return comb, chead
[docs] def save(self, comb, chead):
"""Save the master comb to a FITS file
Parameters
----------
comb : array of shape (nrow, ncol)
master comb data
chead : FITS header
master comb header
"""
comb = np.asarray(comb, dtype=np.float64)
fits.writeto(
self.savefile,
data=comb,
header=chead,
overwrite=True,
output_verify="silentfix+ignore",
)
logger.info("Created frequency comb master spectrum: %s", self.savefile)
[docs] def load(self):
"""Load master comb from disk
Returns
-------
comb : masked array of shape (nrow, ncol)
Master comb with bad pixel map applied
chead : FITS header
Master comb FITS header
"""
comb = fits.open(self.savefile)[0]
comb, chead = comb.data, comb.header
logger.info("Frequency comb master spectrum: %s", self.savefile)
return comb, chead
[docs]class LaserFrequencyCombFinalize(Step):
"""Improve the precision of the wavelength calibration with a laser frequency comb"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["freq_comb_master", "wavecal"]
self._loadDependsOn += ["wavecal"]
#:tuple(int, int): polynomial degree of the wavelength fit
self.degree = config["degree"]
#:float: residual threshold in m/s above which to remove lines
self.threshold = config["threshold"]
#:{'1D', '2D'}: Whether to use 1D or 2D polynomials
self.dimensionality = config["dimensionality"]
self.nstep = config["nstep"]
#:int: Width of the peaks for finding them in the spectrum
self.lfc_peak_width = config["lfc_peak_width"]
@property
def savefile(self):
"""str: Name of the wavelength echelle file"""
return join(self.output_dir, self.prefix + ".comb.npz")
[docs] def run(self, freq_comb_master, wavecal):
"""Improve the wavelength calibration with a laser frequency comb (or similar)
Parameters
----------
files : list(str)
observation files
wavecal : tuple()
results from the wavelength calibration step
orders : tuple
results from the order tracing step
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
wave : array of shape (nord, ncol)
improved wavelength solution
comb : array of shape (nord, ncol)
extracted frequency comb image
"""
comb, chead = freq_comb_master
wave, coef, linelist = wavecal
module = WavelengthCalibrationComb(
plot=self.plot,
plot_title=self.plot_title,
degree=self.degree,
threshold=self.threshold,
dimensionality=self.dimensionality,
nstep=self.nstep,
lfc_peak_width=self.lfc_peak_width,
)
wave = module.execute(comb, wave, linelist)
self.save(wave)
return wave
[docs] def save(self, wave):
"""Save the results of the frequency comb improvement
Parameters
----------
wave : array of shape (nord, ncol)
improved wavelength solution
"""
np.savez(self.savefile, wave=wave)
logger.info("Created frequency comb wavecal file: %s", self.savefile)
[docs] def load(self, wavecal):
"""Load the results of the frequency comb improvement if possible,
otherwise just use the normal wavelength solution
Parameters
----------
wavecal : tuple
results from the wavelength calibration step
Returns
-------
wave : array of shape (nord, ncol)
improved wavelength solution
comb : array of shape (nord, ncol)
extracted frequency comb image
"""
try:
data = np.load(self.savefile, allow_pickle=True)
logger.info("Frequency comb wavecal file: %s", self.savefile)
except FileNotFoundError:
logger.warning(
"No data for Laser Frequency Comb found, using regular wavelength calibration instead"
)
wave, coef, linelist = wavecal
data = {"wave": wave}
wave = data["wave"]
return wave
[docs]class SlitCurvatureDetermination(CalibrationStep, ExtractionStep):
"""Determine the curvature of the slit"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
#:float: how many sigma of bad lines to cut away
self.sigma_cutoff = config["curvature_cutoff"]
#:float: width of the orders in the extraction
self.extraction_width = config["extraction_width"]
#:int: Polynomial degree of the overall fit
self.fit_degree = config["degree"]
#:int: Orders of the curvature to fit, currently supports only 1 and 2
self.curv_degree = config["curv_degree"]
#:{'1D', '2D'}: Whether to use 1d or 2d polynomials
self.curvature_mode = config["dimensionality"]
#:float: peak finding noise threshold
self.peak_threshold = config["peak_threshold"]
#:int: peak width
self.peak_width = config["peak_width"]
#:float: window width to search for peak in each row
self.window_width = config["window_width"]
#:str: Function shape that is fit to individual peaks
self.peak_function = config["peak_function"]
@property
def savefile(self):
"""str: Name of the tilt/shear save file"""
return join(self.output_dir, self.prefix + ".shear.npz")
[docs] def run(self, files, orders, mask, bias):
"""Determine the curvature of the slit
Parameters
----------
files : list(str)
files to use for this
orders : tuple
results of the order tracing
mask : array of shape (nrow, ncol)
Bad pixel mask
Returns
-------
tilt : array of shape (nord, ncol)
first order slit curvature at each point
shear : array of shape (nord, ncol)
second order slit curvature at each point
"""
logger.info("Slit curvature files: %s", files)
orig, thead = self.calibrate(files, mask, bias, None)
extracted, _, _, _ = self.extract(orig, thead, orders, None)
orders, column_range = orders
module = CurvatureModule(
orders,
column_range=column_range,
extraction_width=self.extraction_width,
order_range=self.order_range,
fit_degree=self.fit_degree,
curv_degree=self.curv_degree,
sigma_cutoff=self.sigma_cutoff,
mode=self.curvature_mode,
peak_threshold=self.peak_threshold,
peak_width=self.peak_width,
window_width=self.window_width,
peak_function=self.peak_function,
plot=self.plot,
plot_title=self.plot_title,
)
tilt, shear = module.execute(extracted, orig)
self.save(tilt, shear)
return tilt, shear
[docs] def save(self, tilt, shear):
"""Save results from the curvature
Parameters
----------
tilt : array of shape (nord, ncol)
first order slit curvature at each point
shear : array of shape (nord, ncol)
second order slit curvature at each point
"""
np.savez(self.savefile, tilt=tilt, shear=shear)
logger.info("Created slit curvature file: %s", self.savefile)
[docs] def load(self):
"""Load the curvature if possible, otherwise return None, None, i.e. use vertical extraction
Returns
-------
tilt : array of shape (nord, ncol)
first order slit curvature at each point
shear : array of shape (nord, ncol)
second order slit curvature at each point
"""
try:
data = np.load(self.savefile, allow_pickle=True)
logger.info("Slit curvature file: %s", self.savefile)
except FileNotFoundError:
logger.warning("No data for slit curvature found, setting it to 0.")
data = {"tilt": None, "shear": None}
tilt = data["tilt"]
shear = data["shear"]
return tilt, shear
[docs]class RectifyImage(Step):
"""Create a 2D image of the rectified orders"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["files", "orders", "curvature", "mask", "freq_comb_final"]
# self._loadDependsOn += []
self.extraction_width = config["extraction_width"]
self.input_files = config["input_files"]
[docs] def filename(self, name):
return util.swap_extension(name, ".rectify.fits", path=self.output_dir)
[docs] def run(self, files, orders, curvature, mask, freq_comb_final):
orders, column_range = orders
tilt, shear = curvature
wave = freq_comb_final
files = files[self.input_files]
rectified = {}
for fname in tqdm(files, desc="Files"):
img, head = self.instrument.load_fits(
fname, self.mode, mask=mask, dtype="f8"
)
images, cr, xwd = rectify_image(
img,
orders,
column_range,
self.extraction_width,
self.order_range,
tilt,
shear,
)
wavelength, image = merge_images(images, wave, cr, xwd)
self.save(fname, image, wavelength, header=head)
rectified[fname] = (wavelength, image)
return rectified
[docs] def save(self, fname, image, wavelength, header=None):
# Change filename
fname = self.filename(fname)
# Create HDU List, one extension per order
primary = fits.PrimaryHDU(header=header)
secondary = fits.ImageHDU(data=image)
column = fits.Column(name="wavelength", array=wavelength, format="D")
tertiary = fits.BinTableHDU.from_columns([column])
hdus = fits.HDUList([primary, secondary, tertiary])
# Save data to file
hdus.writeto(fname, overwrite=True, output_verify="silentfix")
[docs] def load(self, files):
files = files[self.input_files]
rectified = {}
for orig_fname in files:
fname = self.filename(orig_fname)
data = fits.open(fname)
img = data[1].data
wave = data[2].data["wavelength"]
rectified[orig_fname] = (wave, img)
return rectified
[docs]class ContinuumNormalization(Step):
"""Determine the continuum to each observation"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["science", "freq_comb", "norm_flat"]
self._loadDependsOn += ["norm_flat", "science"]
@property
def savefile(self):
"""str: savefile name"""
return join(self.output_dir, self.prefix + ".cont.npz")
[docs] def run(self, science, freq_comb, norm_flat):
"""Determine the continuum to each observation
Also splices the orders together
Parameters
----------
science : tuple
results from science step
freq_comb_final : tuple
results from freq_comb_final step (or wavecal if those don't exist)
norm_flat : tuple
results from the normalized flatfield step
Returns
-------
heads : list(FITS header)
FITS headers of each observation
specs : list(array of shape (nord, ncol))
extracted spectra
sigmas : list(array of shape (nord, ncol))
uncertainties of the extracted spectra
conts : list(array of shape (nord, ncol))
continuum for each spectrum
columns : list(array of shape (nord, 2))
column ranges for each spectra
"""
wave = freq_comb
heads, specs, sigmas, columns = science
norm, blaze = norm_flat
logger.info("Continuum normalization")
conts = [None for _ in specs]
for j, (spec, sigma) in enumerate(zip(specs, sigmas)):
logger.info("Splicing orders")
specs[j], wave, blaze, sigmas[j] = splice_orders(
spec,
wave,
blaze,
sigma,
scaling=True,
plot=self.plot,
plot_title=self.plot_title,
)
logger.info("Normalizing continuum")
conts[j] = continuum_normalize(
specs[j],
wave,
blaze,
sigmas[j],
plot=self.plot,
plot_title=self.plot_title,
)
self.save(heads, specs, sigmas, conts, columns)
return heads, specs, sigmas, conts, columns
[docs] def save(self, heads, specs, sigmas, conts, columns):
"""Save the results from the continuum normalization
Parameters
----------
heads : list(FITS header)
FITS headers of each observation
specs : list(array of shape (nord, ncol))
extracted spectra
sigmas : list(array of shape (nord, ncol))
uncertainties of the extracted spectra
conts : list(array of shape (nord, ncol))
continuum for each spectrum
columns : list(array of shape (nord, 2))
column ranges for each spectra
"""
value = {
"heads": heads,
"specs": specs,
"sigmas": sigmas,
"conts": conts,
"columns": columns,
}
joblib.dump(value, self.savefile)
logger.info("Created continuum normalization file: %s", self.savefile)
[docs] def load(self, norm_flat, science):
"""Load the results from the continuum normalization
Returns
-------
heads : list(FITS header)
FITS headers of each observation
specs : list(array of shape (nord, ncol))
extracted spectra
sigmas : list(array of shape (nord, ncol))
uncertainties of the extracted spectra
conts : list(array of shape (nord, ncol))
continuum for each spectrum
columns : list(array of shape (nord, 2))
column ranges for each spectra
"""
try:
data = joblib.load(self.savefile)
logger.info("Continuum normalization file: %s", self.savefile)
except FileNotFoundError:
# Use science files instead
logger.warning(
"No continuum normalized data found. Using unnormalized results instead."
)
heads, specs, sigmas, columns = science
norm, blaze = norm_flat
conts = [blaze for _ in specs]
data = dict(
heads=heads, specs=specs, sigmas=sigmas, conts=conts, columns=columns
)
heads = data["heads"]
specs = data["specs"]
sigmas = data["sigmas"]
conts = data["conts"]
columns = data["columns"]
return heads, specs, sigmas, conts, columns
[docs]class Finalize(Step):
"""Create the final output files"""
def __init__(self, *args, **config):
super().__init__(*args, **config)
self._dependsOn += ["continuum", "freq_comb_final", "config"]
self.filename = config["filename"]
[docs] def output_file(self, number, name):
"""str: output file name"""
out = self.filename.format(
instrument=self.instrument.name,
night=self.night,
mode=self.mode,
number=number,
input=name,
)
return join(self.output_dir, out)
[docs] def run(self, continuum, freq_comb_final, config):
"""Create the final output files
this is includes:
- heliocentric corrections
- creating one echelle file
Parameters
----------
continuum : tuple
results from the continuum normalization
freq_comb_final : tuple
results from the frequency comb step (or wavelength calibration)
"""
heads, specs, sigmas, conts, columns = continuum
wave = freq_comb_final
fnames = []
# Combine science with wavecal and continuum
for i, (head, spec, sigma, blaze, column) in enumerate(
zip(heads, specs, sigmas, conts, columns)
):
head["e_erscle"] = ("absolute", "error scale")
# Add heliocentric correction
try:
rv_corr, bjd = util.helcorr(
head["e_obslon"],
head["e_obslat"],
head["e_obsalt"],
head["e_ra"],
head["e_dec"],
head["e_jd"],
)
logger.debug("Heliocentric correction: %f km/s", rv_corr)
logger.debug("Heliocentric Julian Date: %s", str(bjd))
except KeyError:
logger.warning("Could not calculate heliocentric correction")
# logger.warning("Telescope is in space?")
rv_corr = 0
bjd = head["e_jd"]
head["barycorr"] = rv_corr
head["e_jd"] = bjd
head["HIERARCH PR_version"] = __version__
head = self.save_config_to_header(head, config)
if self.plot:
plt.plot(wave.T, (spec / blaze).T)
if self.plot_title is not None:
plt.title(self.plot_title)
plt.show()
fname = self.save(i, head, spec, sigma, blaze, wave, column)
fnames.append(fname)
return fnames
[docs] def save(self, i, head, spec, sigma, cont, wave, columns):
"""Save one output spectrum to disk
Parameters
----------
i : int
individual number of each file
head : FITS header
FITS header
spec : array of shape (nord, ncol)
final spectrum
sigma : array of shape (nord, ncol)
final uncertainties
cont : array of shape (nord, ncol)
final continuum scales
wave : array of shape (nord, ncol)
wavelength solution
columns : array of shape (nord, 2)
columns that carry signal
Returns
-------
out_file : str
name of the output file
"""
original_name = os.path.splitext(head["e_input"])[0]
out_file = self.output_file(i, original_name)
echelle.save(
out_file, head, spec=spec, sig=sigma, cont=cont, wave=wave, columns=columns
)
logger.info("Final science file: %s", out_file)
return out_file
[docs]class Reducer:
step_order = {
"bias": 10,
"flat": 20,
"orders": 30,
"curvature": 40,
"scatter": 45,
"norm_flat": 50,
"wavecal_master": 60,
"wavecal_init": 64,
"wavecal": 67,
"freq_comb_master": 70,
"freq_comb": 72,
"rectify": 75,
"science": 80,
"continuum": 90,
"finalize": 100,
}
modules = {
"mask": Mask,
"bias": Bias,
"flat": Flat,
"orders": OrderTracing,
"scatter": BackgroundScatter,
"norm_flat": NormalizeFlatField,
"wavecal_master": WavelengthCalibrationMaster,
"wavecal_init": WavelengthCalibrationInitialize,
"wavecal": WavelengthCalibrationFinalize,
"freq_comb_master": LaserFrequencyCombMaster,
"freq_comb": LaserFrequencyCombFinalize,
"curvature": SlitCurvatureDetermination,
"science": ScienceExtraction,
"continuum": ContinuumNormalization,
"finalize": Finalize,
"rectify": RectifyImage,
}
def __init__(
self,
files,
output_dir,
target,
instrument,
mode,
night,
config,
order_range=None,
skip_existing=False,
):
"""Reduce all observations from a single night and instrument mode
Parameters
----------
files: dict{str:str}
Data files for each step
output_dir : str
directory to place output files in
target : str
observed targets as used in directory names/fits headers
instrument : str
instrument used for observations
mode : str
instrument mode used (e.g. "red" or "blue" for HARPS)
night : str
Observation night, in the same format as used in the directory structure/file sorting
config : dict
numeric reduction specific settings, like pixel threshold, which may change between runs
info : dict
fixed instrument specific values, usually header keywords for gain, readnoise, etc.
skip_existing : bool
Whether to skip reductions with existing output
"""
#:dict(str:str): Filenames sorted by usecase
self.files = files
self.output_dir = output_dir.format(
instrument=str(instrument), target=target, night=night, mode=mode
)
if isinstance(instrument, str):
instrument = load_instrument(instrument)
self.data = {"files": files, "config": config}
self.inputs = (instrument, mode, target, night, output_dir, order_range)
self.config = config
self.skip_existing = skip_existing
[docs] def run_module(self, step, load=False):
# The Module this step is based on (An object of the Step class)
module = self.modules[step](*self.inputs, **self.config.get(step, {}))
# Load the dependencies necessary for loading/running this step
dependencies = module.dependsOn if not load else module.loadDependsOn
for dependency in dependencies:
if dependency not in self.data.keys():
self.data[dependency] = self.run_module(dependency, load=True)
args = {d: self.data[d] for d in dependencies}
# Try to load the data, if the step is not specifically given as necessary
# If the intermediate data is not available, run it normally instead
# But give a warning
if load:
try:
logger.info("Loading data from step '%s'", step)
data = module.load(**args)
except FileNotFoundError:
logger.warning(
"Intermediate File(s) for loading step %s not found. Running it instead.",
step,
)
data = self.run_module(step, load=False)
else:
logger.info("Running step '%s'", step)
if step in self.files.keys():
args["files"] = self.files[step]
data = module.run(**args)
self.data[step] = data
return data
[docs] def prepare_output_dir(self):
# create output folder structure if necessary
output_dir = self.output_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
[docs] def run_steps(self, steps="all"):
"""
Execute the steps as required
Parameters
----------
steps : {tuple(str), "all"}, optional
which steps of the reduction process to perform
the possible steps are: "bias", "flat", "orders", "norm_flat", "wavecal", "freq_comb",
"curvature", "science", "continuum", "finalize"
alternatively set steps to "all", which is equivalent to setting all steps
"""
self.prepare_output_dir()
if steps == "all":
steps = list(self.step_order.keys())
steps = list(steps)
if self.skip_existing and "finalize" in steps:
module = self.modules["finalize"](
*self.inputs, **self.config.get("finalize", {})
)
exists = [False] * len(self.files["science"])
data = {"finalize": [None] * len(self.files["science"])}
for i, f in enumerate(self.files["science"]):
fname_in = os.path.basename(f)
fname_in = os.path.splitext(fname_in)[0]
fname_out = module.output_file("?", fname_in)
fname_out = glob.glob(fname_out)
exists[i] = len(fname_out) != 0
if exists[i]:
data["finalize"][i] = fname_out[0]
if all(exists):
logger.info("All science files already exist, skipping this set")
logger.debug("--------------------------------")
return data
steps.sort(key=lambda x: self.step_order[x])
for step in steps:
self.run_module(step)
logger.debug("--------------------------------")
return self.data