Source code for pyreduce.instruments.filters

import logging
import re
from datetime import datetime
from fnmatch import fnmatch

import numpy as np
from astropy import units as u
from astropy.time import Time
from dateutil import parser

logger = logging.getLogger(__name__)


[docs] class Filter: def __init__( self, keyword, dtype="U20", wildcards=False, regex=False, flags=0, unique=True, ignorecase=True, ): self.keyword = keyword self.dtype = dtype self.wildcards = wildcards self.regex = regex self.flags = flags self.data = [] self.unique = unique self.ignorecase = ignorecase if self.ignorecase and not self.flags & re.IGNORECASE: self.flags += re.IGNORECASE def _collect_value(self, header): if self.keyword is None: value = "" elif "{" in self.keyword: kws = re.findall(r"{([^{}]+)}", self.keyword) values = {kw: header.get(kw, "") for kw in kws} value = self.keyword.format(**values) else: value = header.get(self.keyword) if value.__class__ == header.__class__: if len(value) > 0: value = value[0] else: value = "" return value
[docs] def collect(self, header): value = self._collect_value(header) self.data.append(value) return value
[docs] def match(self, value): if self.keyword is None: result = np.full(len(self.data), False) else: try: if self.regex: regex = re.compile(f"^(?:{value})$", flags=self.flags) elif self.wildcards: regex = re.compile(fnmatch.translate(value), flags=self.flags) else: regex = re.compile(value, flags=self.flags) result = [ regex.match(f) is not None if f is not None else False for f in self.data ] except TypeError: result = [f == value for f in self.data] result = np.asarray(result, dtype=bool) return result
[docs] def classify(self, value): if self.unique: if value is not None and value != "": match = self.match(value) data = np.asarray(self.data) data = np.unique(data[match]) else: data = set(self.data) data = [(d, self.match(d)) for d in data] else: if value is not None and value != "": match = self.match(value) else: match = np.full(len(self.data), True) data = [(value, match)] return data
[docs] def clear(self): self.data = []
[docs] class InstrumentFilter(Filter): def __init__(self, keyword="INSTRUME", **kwargs): kwargs["dtype"] = "U20" kwargs["unique"] = False super().__init__(keyword, **kwargs)
[docs] class ObjectFilter(Filter): def __init__(self, keyword="OBJECT", **kwargs): kwargs["dtype"] = "U20" kwargs["unique"] = False super().__init__(keyword, **kwargs)
[docs] class NightFilter(Filter): def __init__( self, keyword="DATE-OBS", timeformat="fits", timezone="utc", timezone_local=None, **kwargs, ): super().__init__(keyword, dtype=datetime, **kwargs) self.timeformat = timeformat self.timezone = timezone self.timezone_local = timezone_local
[docs] @staticmethod def observation_date_to_night(observation_date): """Convert an observation timestamp into the date of the observation night Nights start at 12am and end at 12 am the next day """ if observation_date.to_datetime().hour < 12: observation_date -= 1 * u.day return observation_date.to_datetime().date()
[docs] def collect(self, header): value = super()._collect_value(header) if value is not None: try: value = Time(value, format=self.timeformat, scale=self.timezone) value = self.observation_date_to_night(value) except ValueError: logger.warning( "Could not determine the observation date of %s, skipping it", header, ) else: logger.warning( "Could not determine the observation date of %s, skipping it", header ) self.data.append(value) return value
[docs] def match(self, value): try: value = parser.parse(value).date() except Exception: pass match = super().match(value) return match
[docs] class ChannelFilter(Filter): def __init__( self, keyword, dtype="U20", wildcards=False, regex=False, flags=0, unique=True, ignorecase=True, replacement=None, ): if replacement is None: replacement = {} super().__init__( keyword, dtype=dtype, wildcards=wildcards, regex=regex, flags=flags, unique=unique, ignorecase=ignorecase, ) self.replacement = replacement
[docs] def classify(self, value): data = super().classify(value) data = [ (self.replacement[d] if d in self.replacement.keys() else d, m) for d, m in data ] return data