Source code for pst.observables

"""
This module contains some tools for computing observable quantities
(e.g. photometry, equivalent widths) from spectra.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, List, Optional, Sequence, Tuple, Union

import numpy as np
import os
from astropy import units as u
from astropy import constants as const
from astropy import constants
import requests
import json
from matplotlib import pyplot as plt

from pst.utils import check_unit, flux_conserving_interpolation

ArrayLike = Union[np.ndarray, u.Quantity]
PST_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")

[docs] def list_of_available_filters(): """List the currently available filters in the default directory.""" filter_dir = os.path.join(PST_DATA_DIR, "filters") return os.listdir(filter_dir)
[docs] def load_photometric_filters(filters, to_filter_list=False): """Convenience function for constructing a list of photometric filters. Parameters ---------- filters: list of str List of filters to load. The list might contain the absolute path to a filter response file, or just the filter name following the SVO convention. Returns ------- filters_out: list of :class:`pst.observables.Filter` List of filters. """ filters_out = [] for f in filters: if os.path.exists(f): filters_out.append(Filter.from_text_file(f)) else: filters_out.append(Filter.from_svo(f)) if to_filter_list: return FilterList(filters_out) return filters_out
[docs] def download_svo_filter(name: str, dest_dir: str, verbose=True, retry=3): """Download a filter from the Spanish Virtual Observatory (SVO) Filter Profile Service. Parameters ---------- name : str SVO-compliant filename. The naming convention for SVO filters is TELESC_INSTRUMENT.BAND (e.g. WISE_WISE.W1, Subaru_HSC.g) dest_dir : str Path to the directory where to store the data. Returns ------- file_path : str Path to the downloaded filter file. """ if not os.path.exists(dest_dir): os.makedirs(dest_dir) # create folder if it does not exist name = name.strip(".dat") base_url="http://svo2.cab.inta-csic.es/theory/fps/getdata.php?format=ascii&id=" url = base_url + name.replace("_", "/") filename = name + ".dat" file_path = os.path.join(dest_dir, filename) if verbose: print(f"Querying SVO Filter: {url}") try: r = requests.get(url, stream=True, timeout=30.0) except requests.exceptions.ConnectTimeout as e: if retry: print(f"Connection timed out. Retrying... ({retry} attempts left)") download_svo_filter(name, dest_dir, verbose=verbose, retry=retry-1) else: raise e if len(r.text) > 0: if verbose: print(f"Saving new filter {name} to ", os.path.abspath(file_path)) with open(file_path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024 * 8): if chunk: f.write(chunk) f.flush() os.fsync(f.fileno()) return file_path else: raise FileNotFoundError("Query to {url} was unsucessful")
[docs] class Filter(object): """A photometric filter. Attributes ---------- filter_resp: np.ndarray Original photometric passband response curve. filter_wavelength : :class:`numpy.ndarray` or :class:`astropy.units.Quantity` Original wavelength associated to ``filter_resp``. If type is ``numpy.ndarray``, the value is converted in to an ``astropy.units.Quantity` in Angstrom. response: np.ndarray Filter passband response curve after interpolation. wavelength: :class:`numpy.ndarray` or :class:`astropy.units.Quantity` Wavelength vector associated to ``response``. If type is ``numpy.ndarray``, the value is converted in to an ``astropy.units.Quantity` in Angstrom. default_dir : str Default directory containing filter files. Example ------- >>> from pst.observables import Filter >>> ps_r_filter = Filter("PANSTARRS_PS1.r") >>> wl = np.linspace(5000, 7000, 1000) * u.angstrom >>> ps_r_filter.interpolate(wl) >>> ps_r_filter.plot(add_props=True, show=True) """ default_dir = os.path.join(PST_DATA_DIR, "filters") def __init__(self, wavelength=None, response=None, filter_wavelength=None, filter_response=None, name=None): self.wavelength = wavelength self.response = response self.filter_wavelength = filter_wavelength self.filter_resp = filter_response if filter_wavelength is None and filter_response is None and wavelength is not None: self.filter_wavelength = self.wavelength self.filter_resp = self.response elif filter_wavelength is None and wavelength is None: raise NameError("wavelength or filter_wavelength must be provided") if self.wavelength is not None: if self.response is None: self.interpolate(self.wavelength) else: self.norm_photons = self.get_photons( 3631 * u.Jy * np.ones(self.wavelength.shape) * constants.c / self.wavelength**2, mask_nan=False)[0] else: self.norm_photons = None self.name = name @property def wavelength(self): """Interpolated wavelength grid used by ``response``.""" return self._wavelength @wavelength.setter def wavelength(self, value): if not isinstance(value, u.Quantity) and value is not None: self._wavelength = value << u.angstrom else: self._wavelength = value @property def filter_wavelength(self): """Native wavelength grid associated with ``filter_resp``.""" return self._filter_wavelength @filter_wavelength.setter def filter_wavelength(self, value): if not isinstance(value, u.Quantity) and value is not None: self._filter_wavelength = value << u.angstrom else: self._filter_wavelength = value
[docs] @classmethod def from_text_file(cls, path, wavelength_unit=u.angstrom, **kwargs): """Load a :class:``Filter`` from an input text file. Parameters --------- path : str Path to the text file containing the filter information. The first and second columns must correspond to the wavelength and passband curve, respectively. wavelength_unit : :class:``astropy.units.Unit`` Unit associated with wavelength values in the input file. **kwargs : Arguments to be passed to :func:`numpy.loadtxt` Returns ------- filter : :class:`Filter` The ``Filter`` containing the input information. """ wavelength, response = np.loadtxt(path, usecols=(0, 1), unpack=True, **kwargs) name = os.path.basename(path).replace(".dat", "") return cls(filter_wavelength=wavelength * wavelength_unit, filter_response=response, name=name)
[docs] @classmethod def from_svo(cls, name, destination_dir=None): """Load a :class:``Filter`` from the Spanish Vitural Observatory archive. Parameters --------- name : str SVO filter name. If the filter is not found locally, it will be downloaded from the archive. Returns ------- filter : :class:`Filter` The ``Filter`` containing the input information. Example ------- >>> from pst.observables import Filter >>> panstarrs_r_filter = Filter.from_svo("PANSTARRS_PS1.r") """ if destination_dir is None: destination_dir = cls.default_dir path = cls._isfilter(name) if path is not None: return cls.from_text_file(path) else: path = download_svo_filter(name, dest_dir=destination_dir) return cls.from_text_file(path)
@classmethod def _isfilter(cls, name): path = os.path.join(cls.default_dir, name.strip(".dat") + ".dat") if os.path.isfile(path): return path else: return None
[docs] def effective_wavelength(self): r"""Compute the effective wavelength of the filter. Description ----------- The effective wavelength is computed as .. math:: \lambda_{\rm eff} = \frac{\int{R(\lambda) \cdot \lambda d\lambda}}{\int{R(\lambda) d\lambda}} Returns ------- eff_wl : :class:`astropy.units.Quantity` The effective wavelength of the filter. """ return np.sum(self.filter_wavelength*self.filter_resp)/np.sum(self.filter_resp)
[docs] def effective_bandwidth(self): r"""Compute the effective bandwidth of the filter. Description ----------- The effective bandwith is computed as .. math:: \Delta \lambda_{\rm BW} = \sqrt{8\log(2)} \left(\frac{\int{R(\lambda) \cdot \lambda^2 d\lambda}}{\int{R(\lambda) d\lambda}} - \lambda_{\rm eff}\right)^{1/2} Returns ------- eff_bw : :class:`astropy.units.Quantity` The effective bandwidth of the filter. See also -------- :func:`effective_wavelength` """ return np.sqrt(8*np.log(2)*( np.sum(self.filter_wavelength**2*self.filter_resp)/np.sum(self.filter_resp) - self.effective_wavelength()**2))
[docs] def effective_transmission(self): r"""Compute the effective bandwidth of the filter. Description ----------- The effective transmission is computed as .. math:: R_{\rm eff} = \frac{\int{R(\lambda)^2 d\lambda}}{\int{R(\lambda) d\lambda}} Returns ------- eff_tr : float The effective transmission of the filter. """ return np.sum(self.filter_resp**2)/np.sum(self.filter_resp)
[docs] def interpolate(self, wavelength=None): """Interpolate and update the Filter response curve to an input wavelength. Description ----------- Interpolate linearly the Filter response curve to an input wavelength vector. The result will update the exising values of ``wavelength`` and ``response``. Parameters ---------- wavelength: :class:`numpy.ndarray` or :class:`astropy.units.Quantity` Wavelength vector to interpolate ``filt_resp``. If type is ``numpy.ndarray``, the value is converted in to an ``astropy.units.Quantity` in Angstrom. Returns ------- response : np.ndarray Filter response curve interpolated to the input values of ``wavelength``. """ if not hasattr(wavelength, "unit"): wavelength = wavelength << u.angstrom self.response = flux_conserving_interpolation( wavelength, self.filter_wavelength, self.filter_resp) self.wavelength= wavelength self.norm_photons, _ = self.get_photons( 3631 * u.Jy * np.ones(self.wavelength.shape) * constants.c / self.wavelength**2, mask_nan=False) return self.response
[docs] def get_photons(self, spectra, spectra_err=None, mask_nan=True): r"""Compute the photon flux from an input spectra. Description ----------- The photon flux associated to the filter is computed by numerically integrating the input ``spectra`` with the filter ``response``, using the trapezid method. .. :math: phot_flux = \int{F_\lambda \cdot frac{\lambda}{hc} \cdot R(\lambda) d\lambda} Parameters ---------- spectra : :class:`np.ndarray` or :class:``astropy.units.Quantity`` Input spectra (flux density per wavelength unit) with same dimensions as the Filter ``wavelength``. spectra_err : :class:`np.ndarray` or :class:``astropy.units.Quantity``, optional Input spectra associated error. mask_nan : bool, optional If True, NaN values are masked. Returns ------- photon_flux : :class:``astropy.units.Quantity`` Filter photon flux. photon_flux_err : :class:``astropy.units.Quantity`` Filter photon flux associated error. """ spectra = check_unit(spectra, default_unit=u.Lsun / u.angstrom / u.cm**2, equivalence=u.spectral_density, wav=self.wavelength) if mask_nan: mask = np.isfinite(spectra) photon_flux = np.trapz( spectra[mask] / (constants.h * constants.c / self.wavelength[mask] ) * self.response[mask], x=self.wavelength[mask]) else: photon_flux = np.trapz( spectra / (constants.h * constants.c / self.wavelength ) * self.response, x=self.wavelength) if spectra_err is not None: spectra_err = check_unit(spectra_err, default_unit=u.Lsun / u.angstrom / u.cm**2, equivalence=u.spectral_density, wav=self.wavelength) if mask_nan: mask = mask & np.isfinite(spectra_err) else: mask = np.ones_like(spectra_err, dtype=bool) photon_flux_err = np.trapz( spectra_err[mask] / (constants.h * constants.c / self.wavelength[mask] ) * self.response[mask], x=self.wavelength[mask]) else: photon_flux_err = None return photon_flux, photon_flux_err
[docs] def get_ab(self, spectra, spectra_err=None, mask_nan=True): r"""Compute the synthetic AB magnitude from an input spectra. Description ----------- The AB magnitude associated to the filter is computed by numerically integrating the input ``spectra`` with the filter ``response``, using the trapezid method. .. :math: phot_flux = -2.5 \cdot \log_{10}\left(\frac{N_{phot}(spectra)}{N_{phot}(3631)}\right) Parameters ---------- spectra : :class:`np.ndarray` or :class:``astropy.units.Quantity`` Input spectra (flux density per wavelength unit) with same dimensions as the Filter ``wavelength``. spectra_err : :class:`np.ndarray` or :class:``astropy.units.Quantity``, optional Input spectra associated error. mask_nan : bool, optional If True, NaN values are masked. Returns ------- mag_ab : :class:``astropy.units.Quantity`` AB magnitude. mag_ab_err : :class:``astropy.units.Quantity`` AB magnitude associated error. See also -------- :func:`get_photons` """ n_photons, n_photons_err = self.get_photons(spectra, spectra_err, mask_nan=mask_nan) mag_ab = - 2.5 * np.log10(n_photons / self.norm_photons) if n_photons_err is None: mag_ab_err = None else: mag_ab_err = 2.5 / np.log(10) * n_photons_err / n_photons return mag_ab, mag_ab_err
[docs] def get_fnu(self, spectra, spectra_err=None, mask_nan=True): """Compute synthetic flux density per frequency unit from a spectrum. Parameters ---------- spectra : :class:`np.ndarray` or :class:``astropy.units.Quantity`` Input spectra (flux density per wavelength unit) with same dimensions as the Filter ``wavelength``. spectra_err : :class:`np.ndarray` or :class:``astropy.units.Quantity``, optional Input spectra associated error. mask_nan : bool, optional If True, NaN values are masked. Returns ------- f_nu : :class:``astropy.units.Quantity`` Synthetic flux density in Jy. f_nu_err : :class:``astropy.units.Quantity`` Associated uncertainty in Jy. See also -------- :func:`get_photons` """ n_photons, n_photons_err = self.get_photons(spectra, spectra_err, mask_nan=mask_nan) f_nu = n_photons / self.norm_photons * 3631 * u.Jy if spectra_err is None: f_nu_err = None else: f_nu_err = n_photons_err / self.norm_photons * 3631 * u.Jy return f_nu, f_nu_err
[docs] def get_flambda_vegamag(self, spectra, spectra_err=None, mask_nan=True): """Compute synthetic flux density per wavelength unit from a spectrum. Parameters ---------- spectra : :class:`np.ndarray` or :class:``astropy.units.Quantity`` Input spectra (flux density per wavelength unit) with same dimensions as the Filter ``wavelength``. spectra_err : :class:`np.ndarray` or :class:``astropy.units.Quantity``, optional Input spectra associated error. mask_nan : bool, optional If True, NaN values are masked. Returns ------- f_lambda : :class:``astropy.units.Quantity`` Synthetic flux density per wavelength. f_lambda_err : :class:``astropy.units.Quantity`` Associated uncertainty estimate. See also -------- :func:`get_photons` """ spectra = spectra = check_unit(spectra, default_unit=u.Lsun / u.angstrom / u.cm**2, equivalence=u.spectral_density, wav=self.wavelength) if mask_nan: mask = np.isfinite(spectra) else: mask = np.ones_like(spectra, dtype=bool) f_lambda = np.trapz(spectra[mask] * self.wavelength[mask] * self.response[mask], x=self.wavelength[mask] ) / np.trapz(self.response[mask] * self.wavelength[mask], x=self.wavelength[mask]) if spectra_err is not None: spectra_err = check_unit(spectra_err, default_unit=u.Lsun / u.angstrom / u.cm**2, equivalence=u.spectral_density, wav=self.wavelength) if mask_nan: mask = mask & np.isfinite(spectra_err) else: mask = np.ones_like(spectra_err, dtype=bool) f_lambda_err = np.trapz( spectra_err[mask] / (constants.h * constants.c / self.wavelength[mask] ) * self.response[mask], x=self.wavelength[mask]) else: f_lambda_err = None return f_lambda, f_lambda_err
[docs] def plot(self, add_props=False, ax=None, show=False): """Plot the filter response curve. Plot the original filter response curve together with the interpolated version computed using a new grid of wavelengths. Parameters ---------- add_props: bool If True, add vertical lines indicating the effective wavelength and bandwidth of the filter. Default is False. ax: :class:`matplotlib.axes.Axes`, optional Matplotlib axis to plot on. If None, a new figure and axis are created. show: bool If True, display the plot by calling ``plt.show()``. This requires an interactive matplotlib session. Default is False. """ if ax is None: fig, ax = plt.subplots() else: fig = None ax.step(self.filter_wavelength, self.filter_resp, label='Original', color='k', where="mid") ax.plot(self.filter_wavelength, self.filter_resp, '.', color='k') ax.set_xlabel(f"Wavelength ({self.filter_wavelength.unit})") ax.set_ylabel("Filter response") if self.wavelength is not None: ax.step(self.wavelength, self.response, label='Interpolated', color='r', where="mid") ax.legend() if add_props: eff_wl = self.effective_wavelength() eff_bw = self.effective_bandwidth() ax.axvline(eff_wl.value - eff_bw.value / 2, c="k", ls=":") ax.axvline(eff_wl.value + eff_bw.value / 2, c="k", ls=":") if show: plt.show() else: plt.close() return fig, ax
[docs] class TopHatFilter(Filter): """Top hat photometric filter See also -------- :class:`Filter` """ def __init__(self, central_wave, width, **kwargs): central_wave = check_unit(central_wave, u.Angstrom) width = check_unit(width, u.Angstrom) self.wavelength = kwargs.get('wavelength', None) if self.wavelength is None: self.filter_wavelength = np.linspace(central_wave - width, central_wave + width, 50) else: self.wavelength = check_unit(self.wavelength, u.Angstrom) self.filter_wavelength = self.wavelength.copy() self.filter_resp = np.ones(self.filter_wavelength.size) self.filter_resp[self.filter_wavelength < central_wave - width / 2] = 0 self.filter_resp[self.filter_wavelength > central_wave + width / 2] = 0 if self.wavelength is None: self.response = self.filter_resp.copy() self.interpolate(self.wavelength)
[docs] @dataclass class FilterList: """A list of :class:`Filter` instances for faster computations. #TODO """ filters: List["Filter"] wavelength: Optional[u.Quantity] = None response: Optional[np.ndarray] = None kernel_phot: Optional[u.Quantity] = None dlambda: Optional[u.Quantity] = None norm_phot: Optional[u.Quantity] = None names: Optional[List[str]] = None def __post_init__(self): self.filters = list(self.filters) self.names = [getattr(f, "name", f"band{ib}") for ib, f in enumerate(self.filters)] @property def n_bands(self) -> int: """Number of filters contained in the list.""" return len(self.filters) @property def effective_wavelength(self) -> u.Quantity: """Array of effective wavelengths, one per filter.""" return u.Quantity([f.effective_wavelength() for f in self.filters], u.AA)
[docs] def wavelength_range(self, kappa_bw=2.0) -> [u.Quantity, u.Quantity]: """Get the net wavelength coverage by the filters.""" min_wl = 1e6 << u.AA max_wl = 1 << u.AA for f in self.filters: eff_wl, eff_bw = f.effective_wavelength(), f.effective_bandwidth() low, up = eff_wl - eff_bw * kappa_bw, eff_wl + eff_bw * kappa_bw if low < min_wl: min_wl = low if up > max_wl: max_wl = up return [min_wl, max_wl]
[docs] def interpolate(self, wavelength: ArrayLike) -> "FilterList": """ Interpolate all filter responses to a common wavelength grid. Parameters ---------- wavelength : array or Quantity Target wavelength grid. Must be 1D and monotonic increasing. Returns ------- self """ wl = check_unit(wavelength, u.AA) if wl.ndim != 1: raise ValueError("wavelength must be 1D") if wl.size < 2: raise ValueError("wavelength grid must contain at least two points") # Enforce monotonic increasing for stable integration if np.any(np.diff(wl.to_value(wl.unit)) <= 0): raise ValueError("wavelength grid must be strictly increasing") self.wavelength = wl # Build (B, W) response matrix resp = np.empty((self.n_bands, wl.size), dtype=float) for i, f in enumerate(self.filters): # Use your flux-conserving interpolation (same behavior as Filter.interpolate) resp[i] = flux_conserving_interpolation(wl, f.filter_wavelength, f.filter_resp) self.response = resp # Cache delta_lambda for integral dl = np.empty_like(wl) dl[0] = 0.5 * (wl[1] - wl[0]) dl[-1] = 0.5 * (wl[-1] - wl[-2]) if wl.size > 2: dl[1:-1] = 0.5 * (wl[2:] - wl[:-2]) self.dlambda = dl # Photon kernel: (lambda / (h c)) * R(lambda) self.kernel_phot = (wl / (const.h * const.c)) * resp # AB normalization per band fnu0 = 3631 * u.Jy f_lambda_ref = (fnu0 * const.c / wl**2).to( u.erg / (u.s * u.cm**2 * wl.unit), equivalencies=u.spectral_density(wl), ) self.norm_phot = u.Quantity( np.einsum("i,bi,i->b", f_lambda_ref, self.kernel_phot, self.dlambda), copy=False, ) return self
def _require_interpolated(self): if self.wavelength is None or self.response is None or self.kernel_phot is None or self.dlambda is None: raise RuntimeError("Call interpolate(wavelength) before computing photometry.")
[docs] def get_photons(self, spectra: ArrayLike, spectra_err: Optional[ArrayLike] = None, mask_nan: bool = True) -> Tuple[u.Quantity, Optional[u.Quantity]]: """ Vectorized photon flux through all bandpasses. Parameters ---------- spectra : array or Quantity Shape (..., n_wave). Flux density per wavelength. spectra_err : array or Quantity, optional Same shape as spectra. mask_nan : bool If True, NaNs in spectra are treated as zero contribution. Returns ------- n_phot : Quantity Shape (..., n_bands) n_phot_err : Quantity or None Shape (..., n_bands) """ self._require_interpolated() wl = self.wavelength F = check_unit( spectra, default_unit=u.Lsun / u.angstrom / u.cm**2, equivalence=u.spectral_density, wav=wl, ) if F.shape[-1] != wl.size: raise ValueError(f"spectra last axis must be n_wave={wl.size}, got {F.shape[-1]}") # Mask NaNs if mask_nan: finite = np.isfinite(F) F = u.Quantity(np.where(finite, F.value, 0.0), unit=F.unit, copy=False) # Integrate along the last axis for each band n_phot = u.Quantity( np.einsum("...i,bi,i->...b", F, self.kernel_phot, self.dlambda), copy=False, ) if spectra_err is None: return n_phot, None Ferr = check_unit( spectra_err, default_unit=u.Lsun / u.angstrom / u.cm**2, equivalence=u.spectral_density, wav=wl, ) if Ferr.shape != F.shape: raise ValueError("spectra_err must have the same shape as spectra") if mask_nan: finite_e = np.isfinite(Ferr) Ferr = u.Quantity(np.where(finite_e, Ferr.value, 0.0), unit=Ferr.unit, copy=False) n_phot_err = u.Quantity( np.einsum("...i,bi,i->...b", Ferr, self.kernel_phot, self.dlambda), copy=False, ) return n_phot, n_phot_err
[docs] def get_fnu(self, spectra: ArrayLike, spectra_err: Optional[ArrayLike] = None, mask_nan: bool = True) -> Tuple[u.Quantity, Optional[u.Quantity]]: """ Compute synthetic f_nu in AB system for all bands. Returns ------- fnu : Quantity Shape (..., n_bands) in Jy fnu_err : Quantity or None Shape (..., n_bands) in Jy """ self._require_interpolated() n_phot, n_phot_err = self.get_photons(spectra, spectra_err=spectra_err, mask_nan=mask_nan) fnu0 = 3631 * u.Jy # Per band fnu = (n_phot / self.norm_phot) * fnu0 if n_phot_err is None: return fnu, None fnu_err = (n_phot_err / self.norm_phot) * fnu0 return fnu, fnu_err
[docs] def abmag(self, spectra: ArrayLike, spectra_err: Optional[ArrayLike] = None, mask_nan: bool = True) -> Tuple[u.Quantity, Optional[u.Quantity]]: """ Compute AB magnitudes for all bands. """ self._require_interpolated() n_phot, n_phot_err = self.get_photons(spectra, spectra_err=spectra_err, mask_nan=mask_nan) ratio = n_phot / self.norm_phot mag = -2.5 * np.log10(ratio) if n_phot_err is None: return mag, None mag_err = (2.5 / np.log(10)) * (n_phot_err / n_phot) return mag, mag_err
[docs] def plot(self, add_props=False, ax=None, show=False): """Plot the filters response curve. Parameters ---------- show: bool If True """ if ax is None: fig, ax = plt.subplots() else: fig = None for f in self.filters: f.plot(ax=ax, add_props=add_props, show=False) limits = self.wavelength_range() ax.set_xlim(limits[0].value, limits[1].value) if show: plt.show() else: plt.close() return fig, ax
[docs] class EquivalentWidth(object): r"""Equivalent width of an spectral region. Description ----------- Given a stellar spectra spectra :math:`F_\lambda`, the equivalent width is defined as the area of the spectral line, defined over a given spectral region (:math:`\lambda_{\rm C,\,min}, \lambda_{\rm C,\,max}`), divided by the average flux of the continuum in a given spectral region. It is computed as: .. math:: EW = \int_{\lambda_{C,\,\rm min}}^{\lambda_{\rm C,\,max}} \left(1 - \frac{F_\lambda(\lambda)}{F_{\rm cont}(\lambda)}\right) d\lambda where: .. math:: F_{cont}(\lambda) = \frac{F_{\rm B} \lambda_{\rm R} - F_{\rm R} \lambda_{\rm B}}{\lambda_{\rm R} - \lambda_{\rm B}} + \lambda \frac{F_{\rm R} - F_{\rm B}}{\lambda_{\rm R} - \lambda_{\rm B}} and :math:`F_{\rm B}` and :math:`F_{\rm R}` are the average flux in the left and right spectral windows, respectively, defined as: .. math:: F_{\rm B} = \frac{1}{\lambda_{\rm B,\,max} - \lambda_{\rm B,\,min}} \int_{\lambda_{\rm B,\,min}}^{\lambda_{\rm B,\,max}} F_\lambda(\lambda) d\lambda .. math:: F_{\rm R} = \frac{1}{\lambda_{\rm R,\,max} - \lambda_{\rm R,\,min}} \int_{\lambda_{\rm R,\,min}}^{\lambda_{\rm R,\,max}} F_\lambda(\lambda) d\lambda where :math:`\lambda_{\rm B,\,min}` and :math:`\lambda_{\rm B,\,max}` are the left spectral window boundaries, and :math:`\lambda_{\rm R,\,min}` and :math:`\lambda_{\rm R,\,max}` are the right spectral window boundaries. Example ------- >>> from pst.observables import EquivalentWidth >>> ew = EquivalentWidth(left_wl_range=(3700, 3900), ... central_wl_range=(4000, 4100), ... right_wl_range=(4200, 4400)) >>> wavelength = np.linspace(3600, 4600, 1000) * u.angstrom >>> spectra = np.random.normal(1, 0.1, size=wavelength.size) * u.erg / u.s / u.cm**2 / u.angstrom >>> ew_value, ew_err = ew.compute_ew(wavelength, spectra) """ def __init__(self, left_wl_range, central_wl_range, right_wl_range): self.left_wl_range = np.array(left_wl_range) self.central_wl_range = np.array(central_wl_range) self.right_wl_range = np.array(right_wl_range) @property def left_wl_range(self) -> u.Quantity: r"""Spectral range defining the left pseudocontinuum window :math:`\lambda_{\rm B,\,min}, \lambda_{\rm B,\,max}`.""" return self._left_wl_range @left_wl_range.setter def left_wl_range(self, value): if not isinstance(value, u.Quantity): self._left_wl_range = value * u.angstrom else: self._left_wl_range = value @property def right_wl_range(self) -> u.Quantity: r"""Spectral range defining the right pseudocontinuum window :math:`\lambda_{\rm R,\,min}, \lambda_{\rm R,\,max}`.""" return self._right_wl_range @right_wl_range.setter def right_wl_range(self, value): if not isinstance(value, u.Quantity): self._right_wl_range = value * u.angstrom else: self._right_wl_range = value @property def central_wl_range(self) -> u.Quantity: r"""Spectral range defining the equivalent width window :math:`\lambda_{\rm C,\,min}, \lambda_{\rm C,\,max}`.""" return self._central_wl_range @central_wl_range.setter def central_wl_range(self, value): if not isinstance(value, u.Quantity): self._central_wl_range = value * u.angstrom else: self._central_wl_range = value
[docs] def compute_ew(self, wavelength, spectra, spectra_err=None): """Compute the equivalent width of a given input spectra. Description ----------- The equivalent width is computed using the definition given in the class description. Positive values of the equivalent width indicate an absorption line, while negative values indicate an emission line. The error on the equivalent width is computed using the error propagation formula, assuming null covariance between the spectral points. Parameters ---------- spectra : :class:`np.ndarray` or :class:``astropy.units.Quantity`` Input spectra. If the array is multidimensional, the first axis must correspond to the spectral direction. wavelength : :class:`np.ndarray` or :class:``astropy.units.Quantity`` Wavelength array associated to ``spectra``. spectra_err : :class:`np.ndarray` or :class:``astropy.units.Quantity``, optional If provided, computed the associated error of the equivalent width. Returns ------- ew : np.ndarray The equivalent width of the input spectra. ew_err : np.ndarray The associated error of the equivalent width. """ left_pts = np.where(np.searchsorted(self.left_wl_range, wavelength) == 1)[0] right_pts = np.where(np.searchsorted(self.right_wl_range, wavelength) == 1)[0] lick_pts = np.where(np.searchsorted(self.central_wl_range, wavelength) == 1)[0] delta_ew = self.central_wl_range[1] - self.central_wl_range[0] # pseudo-continuum interpolation right_weight = (self.central_wl_range.mean() - self.left_wl_range.mean() ) / (self.right_wl_range.mean() - self.left_wl_range.mean()) if spectra.ndim > 1: left_cont = np.nanmean(spectra[left_pts, :], axis=0) right_cont = np.nanmean(spectra[right_pts, :], axis=0) pseudocont = (1 - right_weight) * left_cont + right_weight * right_cont flux = np.nanmean(spectra[lick_pts], axis=0) ew = delta_ew * (1 - flux/pseudocont) if spectra_err is None: ew_err = np.nan else: left_cont_var = np.nanmean(spectra_err[left_pts, :]**2, axis=0) / left_pts.size right_cont_var = np.nanmean(spectra_err[right_pts, :]**2, axis=0) / right_pts.size pseudocont_var = ((1 - right_weight) * left_cont_var + right_weight * right_cont_var) flux_var = np.nanmean(spectra_err[lick_pts]**2, axis=0) ew_var = ((delta_ew / pseudocont)**2 * flux_var + (delta_ew * flux/pseudocont**2)**2 * pseudocont_var) ew_err = np.sqrt(ew_var) else: left_cont = np.nanmean(spectra[left_pts]) right_cont = np.nanmean(spectra[right_pts]) pseudocont = (1 - right_weight) * left_cont + right_weight * right_cont flux = np.nanmean(spectra[lick_pts]) ew = delta_ew * (1 - flux/pseudocont) if spectra_err is None: ew_err = np.nan else: left_cont_var = np.nanmean(spectra_err[left_pts]**2 ) / left_pts.size right_cont_var = np.nanmean(spectra_err[right_pts]**2 ) / right_pts.size pseudocont_var = ((1 - right_weight) * left_cont_var + right_weight * right_cont_var) flux_var = np.nanmean(spectra_err[lick_pts]**2) ew_var = delta_ew**2 * ( flux_var / lick_pts.size / pseudocont**2 + flux**2 * pseudocont_var / pseudocont**4) ew_err = np.sqrt(ew_var) return ew, ew_err
[docs] @classmethod def from_json(cls, path): """Load a :class:`EquivalentWidth` from a JSON file. Parameters ---------- path : str Path to the JSON file. Returns ------- ew : :class:`EquivvalentWidth` """ with open(path, "r") as f: data = json.load(f) return cls(**data)
[docs] @classmethod def from_name(cls, name): """Load a :class:`EquivalentWidth` from a JSON file. Parameters ---------- name : str Name of the Lick index. Returns ------- ew : :class:`EquivvalentWidth` """ json_file = os.path.join(PST_DATA_DIR, "lick", name + ".json") if os.path.isfile(json_file): return cls.from_json(json_file) else: raise FileNotFoundError(f"There is no JSON file\n -{json_file}" f"associated to input name {name}")