Source code for jwst.pathloss.pathloss

# Module for calculating pathloss correction for science data sets

import logging
import math

import numpy as np
import stdatamodels.jwst.datamodels as datamodels
from gwcs import wcstools
from stcal.alignment.util import compute_scale

from jwst.assign_wcs import nirspec
from jwst.lib.pipe_utils import match_nans_and_flags
from jwst.lib.wcs_utils import get_wavelengths

log = logging.getLogger(__name__)

# There are 30 slices in the NIRSpec IFU, numbered from 0 to 29
NIRSPEC_IFU_SLICES = np.arange(30)

__all__ = [
    "get_center",
    "shutter_above_is_closed",
    "shutter_below_is_closed",
    "get_aperture_from_model",
    "calculate_pathloss_vector",
    "calculate_two_shutter_uniform_pathloss",
    "do_correction",
    "interpolate_onto_grid",
    "is_pointsource",
    "do_correction_mos",
    "do_correction_fixedslit",
    "do_correction_ifu",
    "do_correction_lrs",
    "do_correction_soss",
]


[docs] def get_center(exp_type, input_model, offsets=False): """ Get the center of the target in the aperture. ``(0.0, 0.0)`` is the aperture center. Coordinates go from -0.5 to 0.5. Parameters ---------- exp_type : str Keyword value input_model : `~stdatamodels.jwst.datamodels.JwstDataModel` Science data to be corrected offsets : bool Only applies for MIRI LRS fixed-slit; if `True`, the offsets will be returned as well in the form of ``(imx, imy)`` Returns ------- xcenter : float X-coordinate center of the target in the aperture ycenter : float Y-coordinate center of the target in the aperture imx : float, optional X-location relative to LRS aperture reference point imy : float, optional Y-location relative to LRS aperture reference point """ if exp_type not in [ "NRS_MSASPEC", "NRS_FIXEDSLIT", "NRS_BRIGHTOBJ", "NRS_IFU", "MIR_LRS-FIXEDSLIT", ]: log.warning(f"get_center not implemented for exp_type {exp_type}") log.warning("Using (0.0, 0.0)") return 0.0, 0.0 if exp_type == "NRS_IFU": # Currently assume IFU sources are centered return 0.0, 0.0 # MSA centering is specified in the MultiSlit model # "input_model" treated as a slit object # can't use getattr here because of bug where calling getattr will auto-populate # schema-defined attributes if input_model.hasattr("source_xpos"): xcenter = input_model.source_xpos else: xcenter = None if input_model.hasattr("source_ypos"): ycenter = input_model.source_ypos else: ycenter = None if exp_type in ["NRS_MSASPEC", "NRS_FIXEDSLIT", "NRS_BRIGHTOBJ"]: if xcenter is not None and ycenter is not None: log.info(f"Using source_xpos, source_ypos = {xcenter, ycenter}") return xcenter, ycenter log.warning("Unable to get source center from model source_xpos, source_ypos") log.warning("Using 0.0, 0.0") xcenter = 0.0 ycenter = 0.0 return xcenter, ycenter # only MIR_LRS-FIXEDSLIT left. Get center from target ra, dec # get slit reference point from wcs object det_to_sky = input_model.meta.wcs.get_transform("detector", "world") sky_to_det = input_model.meta.wcs.get_transform("world", "detector") imx = -det_to_sky.offset_1 # aperture ref point from specwcs imy = -det_to_sky.offset_2 if xcenter is None or ycenter is None: # compute location of target on detector _ref_ra, _ref_dec, ref_wave = det_to_sky(imx, imy) xcenter, ycenter = sky_to_det( input_model.meta.target.ra, input_model.meta.target.dec, ref_wave ) log.info(f"LRS target location from RA/Dec = {xcenter, ycenter}") else: log.info(f"LRS target location from source_xpos, source_ypos = {xcenter, ycenter}") # compute location relative to LRS aperture reference point xcenter -= imx ycenter -= imy if offsets: return xcenter, ycenter, imx, imy return xcenter, ycenter
[docs] def shutter_above_is_closed(shutter_state): """ Check if the shutter above the target shutter is closed. Parameters ---------- shutter_state : str String that describes the shutter state. Returns ------- result : bool `True` if the shutter above the target shutter is closed. """ ref_loc = shutter_state.find("x") nshutters = len(shutter_state) if ref_loc == nshutters - 1 or shutter_state[ref_loc + 1] == "0": return True else: return False
[docs] def shutter_below_is_closed(shutter_state): """ Check if the shutter below the target shutter is closed. Parameters ---------- shutter_state : str String that describes the shutter state. Returns ------- result : bool `True` if the shutter below the target shutter is closed. """ ref_loc = shutter_state.find("x") if ref_loc == 0 or shutter_state[ref_loc - 1] == "0": return True else: return False
[docs] def get_aperture_from_model(input_model, match): """ Determine the correct aperture in the model to use. Based on the value of the 'match' parameter: * For MSA, match is the shutter state string. * For fixed slit, match is the name of the slit. Parameters ---------- input_model : `~stdatamodels.jwst.datamodels.JwstDataModel` Science data to be corrected match : str Aperture name or shutter state Returns ------- aperture : str or None Aperture name """ if input_model.meta.exposure.type == "NRS_MSASPEC": # Currently there are only 2 apertures in the MSA pathloss reference file: 1x1 and 1x3 # Only return the 1x1 aperture if the reference shutter has closed shutters above and below if shutter_below_is_closed(match) and shutter_above_is_closed(match): matchsize = 1 else: matchsize = 3 for aperture in input_model.apertures: # Only return the aperture if aperture.shutters == matchsize: return aperture elif input_model.meta.exposure.type in ["NRS_FIXEDSLIT", "NRS_BRIGHTOBJ", "NIS_SOSS"]: for aperture in input_model.apertures: log.debug(aperture.name) if aperture.name == match: return aperture else: log.warning(f"Unable to get aperture from exp_type {input_model.meta.exposure.type}") # If nothing matches, return None return None
[docs] def calculate_pathloss_vector(pathloss_refdata, pathloss_wcs, xcenter, ycenter, calc_wave=True): """ Calculate the pathloss vectors from the pathloss model. Use the coordinates of the center of the target to interpolate the pathloss value as a function of wavelength at that location Parameters ---------- pathloss_refdata : ndarray The input pathloss data array pathloss_wcs : `~gwcs.wcs.WCS` The pathloss datamodel's WCS attribute xcenter : float The x-center of the target (-0.5 to 0.5) ycenter : float The y-center of the target (-0.5 to 0.5) calc_wave : bool Calculate a wavelength vector from the reference file Returns ------- wavelength : ndarray The 1-D wavelength array pathloss : ndarray The corresponding 1-D pathloss array is_inside_slitlet : bool `True` if the object source position is inside the slitlet, otherwise `False` """ is_inside_slitlet = True if len(pathloss_refdata.shape) == 3: wavesize, nrows, ncols = pathloss_refdata.shape else: wavesize = pathloss_refdata.shape[0] wavelength = np.zeros(wavesize, dtype=np.float32) pathloss_vector = np.zeros(wavesize, dtype=np.float32) # uniformsource.data is 1-d. We just return it, along with # a vector of wavelengths calculated using the WCS. # Uniform source is always inside the slitlet if len(pathloss_refdata.shape) == 1: crpix1 = pathloss_wcs.crpix1 crval1 = pathloss_wcs.crval1 cdelt1 = pathloss_wcs.cdelt1 for i in np.arange(wavesize): wavelength[i] = crval1 + (float(i + 1) - crpix1) * cdelt1 return wavelength, pathloss_refdata, True # pointsource.data is 3-d, so we have to extract a wavelength vector # at the specified location. We do this using bilinear interpolation else: # If requested, calculate a wavelength vector from the ref file # WCS info if calc_wave: crpix3 = pathloss_wcs.crpix3 crval3 = pathloss_wcs.crval3 cdelt3 = pathloss_wcs.cdelt3 for i in np.arange(wavesize): wavelength[i] = crval3 + (float(i + 1) - crpix3) * cdelt3 # Calculate python index of object center crpix1 = pathloss_wcs.crpix1 crval1 = pathloss_wcs.crval1 cdelt1 = pathloss_wcs.cdelt1 crpix2 = pathloss_wcs.crpix2 crval2 = pathloss_wcs.crval2 cdelt2 = pathloss_wcs.cdelt2 object_colindex = crpix1 + (xcenter - crval1) / cdelt1 - 1 object_rowindex = crpix2 + (ycenter - crval2) / cdelt2 - 1 # check whether target is inside slit boundaries if ( object_colindex < 0 or object_colindex >= (ncols - 1) or object_rowindex < 0 or object_rowindex >= (nrows - 1) ): is_inside_slitlet = False else: # Do bilinear interpolation to get the array of # path loss vs wavelength dx1 = object_colindex - int(object_colindex) dx2 = 1.0 - dx1 dy1 = object_rowindex - int(object_rowindex) dy2 = 1.0 - dy1 a11 = dx1 * dy1 a12 = dx1 * dy2 a21 = dx2 * dy1 a22 = dx2 * dy2 j, i = int(object_colindex), int(object_rowindex) pathloss_vector = ( a22 * pathloss_refdata[:, i, j] + a21 * pathloss_refdata[:, i + 1, j] + a12 * pathloss_refdata[:, i, j + 1] + a11 * pathloss_refdata[:, i + 1, j + 1] ) return wavelength, pathloss_vector, is_inside_slitlet
[docs] def calculate_two_shutter_uniform_pathloss(pathloss_model): """ Calculate pathloss for uniform source, two-shutter slit. The two-shutter MOS case for uniform source calculation requires a custom routine since it uses both the 1X1 and 1X3 extensions of the pathloss reference file. Parameters ---------- pathloss_model : `~stdatamodels.jwst.datamodels.PathlossModel` The pathloss datamodel Returns ------- wavelength, pathloss_vector : ndarray The wavelength and pathloss 1-D arrays, respectively """ # This routine will run if the slit has exactly 2 shutters n_apertures = len(pathloss_model.apertures) if n_apertures != 2: log.warning(f"Expected 2 apertures in pathloss reference file, found {n_apertures}") return (None, None) for aperture in pathloss_model.apertures: aperture_name = aperture.name.upper() if aperture_name == "MOS1X1": aperture1x1 = aperture elif aperture_name == "MOS1X3": aperture1x3 = aperture if aperture_name not in ["MOS1X1", "MOS1X3"]: log.warning( f"Unexpected aperture name '{aperture_name}' (Expected 'MOS1X1' or 'MOS1X3')" ) return None, None pathloss1x1 = aperture1x1.uniform_data pathloss1x3 = aperture1x3.uniform_data if len(pathloss1x1) != len(pathloss1x3): log.warning("Pathloss 1x1 and 1x3 arrays have different sizes") return None, None if pathloss1x1.ndim != 1: log.warning(f"Pathloss arrays have unexpected shape {pathloss1x1.shape} (Expected 1D)") return None, None wcs_key = ["crval1", "crpix1", "cdelt1"] for key in wcs_key: if getattr(aperture1x1.uniform_wcs, key) != getattr(aperture1x3.uniform_wcs, key): log.warning("1x1 and 1x3 apertures have different WCS") return None, None wavesize = len(pathloss1x1) wavelength = np.zeros(wavesize) crpix1 = aperture1x1.uniform_wcs.crpix1 crval1 = aperture1x1.uniform_wcs.crval1 cdelt1 = aperture1x1.uniform_wcs.cdelt1 for i in np.arange(wavesize): wavelength[i] = crval1 + (float(i + 1) - crpix1) * cdelt1 average_pathloss = 0.5 * (pathloss1x1 + pathloss1x3) log.info("2 shutter slit: Uniform correction averages corrections for 1x1 and 1x3 apertures") return wavelength, average_pathloss
[docs] def do_correction( input_model, pathloss_model=None, inverse=False, source_type=None, user_slit_loc=None, return_corrections=True, ): """ Execute all tasks for Path Loss Correction. Parameters ---------- input_model : `~stdatamodels.jwst.datamodels.JwstDataModel` Science data to be corrected. Updated in place. pathloss_model : `~stdatamodels.jwst.datamodels.MirLrsPathlossModel`, \ `~stdatamodels.jwst.datamodels.PathlossModel`, or None, optional Pathloss correction data inverse : bool, optional Invert the math operations used to apply the pathloss correction. source_type : str or None, optional Force processing using the specified source type. user_slit_loc : float, optional User-provided slit location in units of arcsec, where ``(0, 0)`` is the center and the edges are +/-0.255 arcsec. return_corrections : bool, optional If `True`, a model containing the applied corrections is returned. Returns ------- input_model : `~stdatamodels.jwst.datamodels.JwstDataModel` The corrected science data with pathloss extensions added. corrections : `~stdatamodels.jwst.datamodels.JwstDataModel`, optional A model of the correction arrays, returned if ``return_corrections`` is `True`. """ if not pathloss_model: raise RuntimeError("A PathlossModel must be specified.") exp_type = input_model.meta.exposure.type log.info(f"Input exposure type is {exp_type}") corrections = None if exp_type == "NRS_MSASPEC": corrections = do_correction_mos(input_model, pathloss_model, inverse, source_type) elif exp_type in ["NRS_FIXEDSLIT", "NRS_BRIGHTOBJ"]: corrections = do_correction_fixedslit(input_model, pathloss_model, inverse, source_type) elif exp_type == "NRS_IFU": corrections = do_correction_ifu(input_model, pathloss_model, inverse, source_type) elif exp_type in ["MIR_LRS-FIXEDSLIT", "MIR_WFSS"]: # only apply correction to LRS fixed-slit if target is point source if is_pointsource(input_model.meta.target.source_type): do_correction_lrs(input_model, pathloss_model, user_slit_loc) else: log.warning("Not a point source; skipping correction for LRS.") input_model.meta.cal_step.pathloss = "SKIPPED" elif exp_type == "NIS_SOSS": if inverse: log.warning("Use of inversion with NIS_SOSS is not implemented. Skipping") input_model.meta.cal_step.pathloss = "SKIPPED" elif source_type is not None: log.warning("Forcing of source type with NIS_SOSS is not implemented. Skipping") input_model.meta.cal_step.pathloss = "SKIPPED" else: do_correction_soss(input_model, pathloss_model) if return_corrections: return input_model, corrections else: return input_model
[docs] def interpolate_onto_grid(wavelength_grid, wavelength_vector, pathloss_vector): """ Interpolate pathloss value onto grid. Get the value of pathloss by interpolating each non-NaN element of ``wavelength_grid`` into ``pathloss_vector`` using the index lookup of ``wavelength_vector``. Pixels with wavelengths outside the range of the reference file should have a correction of NaN. Parameters ---------- wavelength_grid : ndarray The 2-D grid of wavelengths for each science data pixel wavelength_vector : ndarray The 1-D vector of wavelengths pathloss_vector : ndarray Corresponding 1-D vector of pathloss values Returns ------- pathloss_grid : ndarray Grid of pathloss corrections for each non-NaN pixel """ # Need to set the pathloss correction of pixels whose wavelength is outside # the wavelength range of the reference file to NaN. This trick will accomplish # that while still allowing the use of array linear interpolation # # Pad out the wavelength and pathloss vectors by adding another element # at the beginning and end, and put NaN in the new first and last elements # of the extended pathloss vector extended_pathloss_vector = np.zeros(len(pathloss_vector) + 2) extended_pathloss_vector[1:-1] = pathloss_vector extended_pathloss_vector[0] = np.nan extended_pathloss_vector[-1] = np.nan extended_wavelength_vector = np.zeros(len(wavelength_vector) + 2) extended_wavelength_vector[1:-1] = wavelength_vector extended_wavelength_vector[0] = wavelength_vector[0] - 0.1 extended_wavelength_vector[-1] = wavelength_vector[-1] + 0.1 # Find the indices in the original wavelength array that correspond # to the lower of 2 adjacent wavelength values spanning the wavelength # values in the wavelength grid. NaNs and values > max wavelength will # return an index to an element 1 past the array, values below min wavelength # will return 0 upper_indices = np.searchsorted(wavelength_vector, wavelength_grid) # Move these indices so they correspond to the extended arrays lower_indices = upper_indices upper_indices = upper_indices + 1 # Now we can just proceed without worrying about values outside the wavelength # array numerator = wavelength_grid - extended_wavelength_vector[lower_indices] denominator = ( extended_wavelength_vector[upper_indices] - extended_wavelength_vector[lower_indices] ) fraction = numerator / denominator pathloss_grid = wavelength_grid * 0.0 pathloss_grid = extended_pathloss_vector[lower_indices] + fraction * ( extended_pathloss_vector[upper_indices] - extended_pathloss_vector[lower_indices] ) return pathloss_grid
[docs] def is_pointsource(srctype): """ Check whether input is a point source. Parameters ---------- srctype : str Determined type of source. Returns ------- result : bool `True` if ``srctype`` is "POINT" """ if srctype is None: return False elif srctype.upper() == "POINT": return True else: return False
[docs] def do_correction_mos(data, pathloss, inverse=False, source_type=None): """ Path loss correction for NIRSpec MOS. Data are modified in-place. Parameters ---------- data : `~stdatamodels.jwst.datamodels.JwstDataModel` The NIRSpec MOS data to be corrected. pathloss : `~stdatamodels.jwst.datamodels.PathlossModel` or None The pathloss reference data. inverse : bool Invert the math operations used to apply the pathloss correction. source_type : str or None Force processing using the specified source type. Returns ------- corrections : `~stdatamodels.jwst.datamodels.MultiSlitModel` The pathloss corrections applied. """ exp_type = data.meta.exposure.type # Loop over all MOS slitlets corrections = datamodels.MultiSlitModel() correction_found = False for slit_number, slit in enumerate(data.slits): log.info(f"Working on slit {slit_number}") correction = _corrections_for_mos(slit, pathloss, exp_type, source_type) corrections.slits.append(correction) # Apply the correction if not correction: log.warning(f"No correction provided for slit {slit_number}. Skipping") continue correction_found = True if not inverse: slit.data /= correction.data slit.err /= correction.data slit.var_poisson /= correction.data**2 slit.var_rnoise /= correction.data**2 if slit.var_flat is not None and np.size(slit.var_flat) > 0: slit.var_flat /= correction.data**2 else: slit.data *= correction.data slit.err *= correction.data slit.var_poisson *= correction.data**2 slit.var_rnoise *= correction.data**2 if slit.var_flat is not None and np.size(slit.var_flat) > 0: slit.var_flat *= correction.data**2 slit.pathloss_point = correction.pathloss_point slit.pathloss_uniform = correction.pathloss_uniform slit.pathloss_correction_type = correction.pathloss_correction_type # Make sure all NaNs and flags match up in the output slit model match_nans_and_flags(slit) # Set step status to complete if correction_found: data.meta.cal_step.pathloss = "COMPLETE" else: data.meta.cal_step.pathloss = "SKIPPED" return corrections
[docs] def do_correction_fixedslit(data, pathloss, inverse=False, source_type=None): """ Path loss correction for NIRSpec fixed-slit modes. Data are modified in-place. Parameters ---------- data : `~stdatamodels.jwst.datamodels.JwstDataModel` The NIRSpec fixed-slit data to be corrected. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data. inverse : bool Invert the math operations used to apply the pathloss correction. source_type : str or None Force processing using the specified source type. Returns ------- corrections : `~stdatamodels.jwst.datamodels.MultiSlitModel` The pathloss corrections applied. """ exp_type = data.meta.exposure.type # Loop over all slits contained in the input corrections = datamodels.MultiSlitModel() correction_found = False for slit_number, slit in enumerate(data.slits): log.info(f"Working on slit {slit.name}") correction = _corrections_for_fixedslit(slit, pathloss, exp_type, source_type) corrections.slits.append(correction) # Apply the correction if not correction: log.warning(f"No correction provided for slit {slit_number}. Skipping") continue correction_found = True if not inverse: slit.data /= correction.data slit.err /= correction.data if slit.var_poisson is not None: slit.var_poisson /= correction.data**2 if slit.var_rnoise is not None: slit.var_rnoise /= correction.data**2 if slit.var_flat is not None: slit.var_flat /= correction.data**2 else: slit.data *= correction.data slit.err *= correction.data if slit.var_poisson is not None: slit.var_poisson *= correction.data**2 if slit.var_rnoise is not None: slit.var_rnoise *= correction.data**2 if slit.var_flat is not None: slit.var_flat *= correction.data**2 slit.pathloss_point = correction.pathloss_point slit.pathloss_uniform = correction.pathloss_uniform slit.pathloss_correction_type = correction.pathloss_correction_type # Make sure all NaNs and flags match up in the output slit model match_nans_and_flags(slit) # Set step status to complete if correction_found: data.meta.cal_step.pathloss = "COMPLETE" else: data.meta.cal_step.pathloss = "SKIPPED" return corrections
[docs] def do_correction_ifu(data, pathloss, inverse=False, source_type=None): """ Path loss correction for NIRSpec IFU. Data are modified in-place. Parameters ---------- data : `~stdatamodels.jwst.datamodels.JwstDataModel` The NIRSpec IFU data to be corrected. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data. inverse : bool Invert the math operations used to apply the pathloss correction. source_type : str or None Force processing using the specified source type. Returns ------- corrections : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss corrections applied. """ correction = _corrections_for_ifu(data, pathloss, source_type) # For IFU, the pathloss correction type is not stored in the model. # Just log it here. log.info(f"Correction type used: {correction.pathloss_correction_type}") if not inverse: data.data /= correction.data data.err /= correction.data data.var_poisson /= correction.data**2 data.var_rnoise /= correction.data**2 if data.var_flat is not None and np.size(data.var_flat) > 0: data.var_flat /= correction.data**2 else: data.data *= correction.data data.err *= correction.data data.var_poisson *= correction.data**2 data.var_rnoise *= correction.data**2 if data.var_flat is not None and np.size(data.var_flat) > 0: data.var_flat *= correction.data**2 data.pathloss_point = correction.pathloss_point data.pathloss_uniform = correction.pathloss_uniform # This might be useful to other steps data.wavelength = correction.wavelength # Make sure all NaNs and flags match up in the output data model match_nans_and_flags(data) # Set the step status to complete data.meta.cal_step.pathloss = "COMPLETE" return correction
[docs] def do_correction_lrs(data, pathloss, user_slit_loc): """ Path loss correction for MIRI LRS fixed-slit. Data are modified in-place. Parameters ---------- data : `~stdatamodels.jwst.datamodels.JwstDataModel` The MIRI LRS fixed-slit data to be corrected. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data. user_slit_loc : float User-provided slit location in units of arcsec, where ``(0, 0)`` is the center and the edges are +/-0.255 arcsec. """ correction = _corrections_for_lrs(data, pathloss, user_slit_loc) # LRS correction is multiplicative data.data *= correction.data data.err *= correction.data data.var_poisson *= correction.data**2 data.var_rnoise *= correction.data**2 if data.var_flat is not None and np.size(data.var_flat) > 0: data.var_flat *= correction.data**2 data.pathloss_point = correction.pathloss_point # This might be useful to other steps data.wavelength = correction.wavelength # Make sure all NaNs and flags match up in the output data model match_nans_and_flags(data) # Set the step status to complete data.meta.cal_step.pathloss = "COMPLETE" return
[docs] def do_correction_soss(data, pathloss): """ Path loss correction for NIRISS SOSS. NIRISS SOSS pathloss correction is basically a correction for the flux from the 2nd and 3rd order dispersion that falls outside the subarray aperture. The correction depends on the pupil wheel position and column number (or wavelength). The simple option is to do the correction by column number, then the only interpolation needed is a 1-D interpolation into the pupil wheel position dimension. Data are modified in-place. Parameters ---------- data : `~stdatamodels.jwst.datamodels.JwstDataModel` The NIRISS SOSS data to be corrected. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data. """ # Omit correction if this is a TSO observation if data.meta.visit.tsovisit: log.warning("NIRISS SOSS TSO observations skip the pathloss step") data.meta.cal_step.pathloss = "SKIPPED" return # Get the pupil wheel position pupil_wheel_position = data.meta.instrument.pupil_position if pupil_wheel_position is None: log.warning( f"Unable to get pupil wheel position from PWCPOS keyword for {data.meta.filename}" ) log.warning("Pathloss correction skipped") data.meta.cal_step.pathloss = "SKIPPED" return # Get the aperture from the reference file that matches the subarray subarray = data.meta.subarray.name aperture = get_aperture_from_model(pathloss, subarray) if aperture is None: log.warning(f"Unable to get Aperture from reference file for subarray {subarray}") log.warning("Pathloss correction skipped") data.meta.cal_step.pathloss = "SKIPPED" return else: log.info(f"Aperture {aperture.name} selected from reference file") # Set up pathloss correction array pathloss_array = aperture.pointsource_data[0] nrows, ncols = pathloss_array.shape _, data_ncols = data.data.shape correction = np.ones(data_ncols, dtype=np.float32) crpix1 = aperture.pointsource_wcs.crpix1 crval1 = aperture.pointsource_wcs.crval1 cdelt1 = aperture.pointsource_wcs.cdelt1 pupil_wheel_index = crpix1 + (pupil_wheel_position - crval1) / cdelt1 - 1 if pupil_wheel_index < 0 or pupil_wheel_index > (ncols - 2): log.warning("Pupil Wheel position outside reference file coverage") log.warning("Setting pathloss correction to 1.0") else: ix = int(pupil_wheel_index) dx = pupil_wheel_index - ix crpix2 = aperture.pointsource_wcs.crpix2 crval2 = aperture.pointsource_wcs.crval2 cdelt2 = aperture.pointsource_wcs.cdelt2 for row in range(data_ncols): row_1indexed = row + 1 refrow_index = math.floor(crpix2 + (row_1indexed - crval2) / cdelt2 - 0.5) if refrow_index < 0 or refrow_index > (nrows - 1): correction[row] = 1.0 else: correction[row] = (1.0 - dx) * pathloss_array[ refrow_index, ix ] + dx * pathloss_array[refrow_index, ix + 1] # Create and apply the 2D correction pathloss_2d = np.broadcast_to(correction, data.data.shape) data.data /= pathloss_2d data.err /= pathloss_2d data.var_poisson /= pathloss_2d**2 data.var_rnoise /= pathloss_2d**2 if data.var_flat is not None and np.size(data.var_flat) > 0: data.var_flat /= pathloss_2d**2 data.pathloss_point = pathloss_2d # Make sure all NaNs and flags match up in the output data model match_nans_and_flags(data) # Set step status to complete data.meta.cal_step.pathloss = "COMPLETE"
def _corrections_for_mos(slit, pathloss, exp_type, source_type=None): """ Calculate the correction arrays for MOS slit. Parameters ---------- slit : `~stdatamodels.jwst.datamodels.SlitModel` The slit being operated on. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data exp_type : str Exposure type source_type : str or None Force processing using the specified source type. Returns ------- correction : `~stdatamodels.jwst.datamodels.SlitModel` The correction arrays """ correction = None size = slit.data.size # Only work on slits with data.size > 0 if size <= 0: log.warning(f"Slit has data size = {size}") return correction # Get centering xcenter, ycenter = get_center(exp_type, slit) # Calculate the 1-d wavelength and pathloss vectors # for the source position # Get the aperture from the reference file that matches the slit slitlength = len(slit.shutter_state) aperture = get_aperture_from_model(pathloss, slit.shutter_state) if aperture is None: log.warning(f"Cannot find matching pathloss model for slit with {slitlength} shutters") return correction log.info(f"Shutter state = {slit.shutter_state}, using {aperture.name} entry in ref file") two_shutters = False if slitlength == 2: two_shutters = True if shutter_below_is_closed(slit.shutter_state) and not shutter_above_is_closed( slit.shutter_state ): ycenter = ycenter - 1.0 log.info("Shutter below fiducial is closed, using lower region of pathloss array") if not shutter_below_is_closed(slit.shutter_state) and shutter_above_is_closed( slit.shutter_state ): ycenter = ycenter + 1.0 log.info("Shutter above fiducial is closed, using upper region of pathloss array") (wavelength_pointsource, pathloss_pointsource_vector, is_inside_slitlet) = ( calculate_pathloss_vector( aperture.pointsource_data, aperture.pointsource_wcs, xcenter, ycenter ) ) if two_shutters: (wavelength_uniformsource, pathloss_uniform_vector) = ( calculate_two_shutter_uniform_pathloss(pathloss) ) else: (wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector( aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter ) # This should only happen if the 2 shutter uniform pathloss calculation has an error if wavelength_uniformsource is None or pathloss_uniform_vector is None: log.warning("Unable to calculate 2 shutter uniform pathloss.") log.warning("Using 3 shutter aperture.") (wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector( aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter ) if not is_inside_slitlet: log.warning("Source is outside slit.") return correction # Wavelengths in the reference file are in meters, # need them to be in microns wavelength_pointsource *= 1.0e6 wavelength_uniformsource *= 1.0e6 wavelength_array = slit.wavelength # Compute the point source pathloss 2D correction pathloss_2d_ps = interpolate_onto_grid( wavelength_array, wavelength_pointsource, pathloss_pointsource_vector ) # Compute the uniform source pathloss 2D correction pathloss_2d_un = interpolate_onto_grid( wavelength_array, wavelength_uniformsource, pathloss_uniform_vector ) # Use the appropriate correction for this slit if is_pointsource(source_type or slit.source_type): pathloss_2d = pathloss_2d_ps correction_type = "POINT" else: pathloss_2d = pathloss_2d_un correction_type = "UNIFORM" # Save the corrections. The `data` portion is the correction used. # The individual ones will be saved in the respective attributes. correction = datamodels.SlitModel(data=pathloss_2d) correction.pathloss_point = pathloss_2d_ps correction.pathloss_uniform = pathloss_2d_un correction.pathloss_correction_type = correction_type return correction def _corrections_for_fixedslit(slit, pathloss, exp_type, source_type): """ Calculate the correction arrays for fixed-slit. Parameters ---------- slit : `~stdatamodels.jwst.datamodels.SlitModel` The slit being operated on. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data exp_type : str Exposure type source_type : str or None Force processing using the specified source type. Returns ------- correction : `~stdatamodels.jwst.datamodels.SlitModel` The correction arrays """ correction = None # Get centering xcenter, ycenter = get_center(exp_type, slit) # Calculate the 1-d wavelength and pathloss vectors for the source position # Get the aperture from the reference file that matches the slit aperture = get_aperture_from_model(pathloss, slit.name) if aperture is None: log.warning(f"Cannot find matching pathloss model for {slit.name}") log.warning("Skipping pathloss correction for this slit") return correction log.info(f"Using aperture {aperture.name}") (wavelength_pointsource, pathloss_pointsource_vector, is_inside_slit) = ( calculate_pathloss_vector( aperture.pointsource_data, aperture.pointsource_wcs, xcenter, ycenter ) ) (wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector( aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter ) if not is_inside_slit: log.warning(f"Source is outside slit. Skipping pathloss correction for slit {slit.name}") return correction # Wavelengths in the reference file are in meters, # need them to be in microns wavelength_pointsource *= 1.0e6 wavelength_uniformsource *= 1.0e6 # Use the appropriate correction for this slit if is_pointsource(source_type or slit.source_type): # calculate the point source corrected wavelengths and uncorrected wavelengths # for the slit wavelength_array_corr = get_wavelengths(slit, use_wavecorr=True) wavelength_array_uncorr = get_wavelengths(slit, use_wavecorr=False) # Compute the point source pathloss 2D correction pathloss_2d_ps = interpolate_onto_grid( wavelength_array_corr, wavelength_pointsource, pathloss_pointsource_vector ) # Compute the uniform source pathloss 2D correction pathloss_2d_un = interpolate_onto_grid( wavelength_array_uncorr, wavelength_uniformsource, pathloss_uniform_vector ) pathloss_2d = pathloss_2d_ps correction_type = "POINT" else: wavelength_array = slit.wavelength # Compute the point source pathloss 2D correction pathloss_2d_ps = interpolate_onto_grid( wavelength_array, wavelength_pointsource, pathloss_pointsource_vector ) # Compute the uniform source pathloss 2D correction pathloss_2d_un = interpolate_onto_grid( wavelength_array, wavelength_uniformsource, pathloss_uniform_vector ) pathloss_2d = pathloss_2d_un correction_type = "UNIFORM" # Save the corrections. The `data` portion is the correction used. # The individual ones will be saved in the respective attributes. correction = datamodels.SlitModel(data=pathloss_2d) correction.pathloss_point = pathloss_2d_ps correction.pathloss_uniform = pathloss_2d_un correction.pathloss_correction_type = correction_type return correction def _corrections_for_ifu(data, pathloss, source_type): """ Calculate the correction arrays for IFU. Parameters ---------- data : `~stdatamodels.jwst.datamodels.SlitModel` The data being operated on. pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel` The pathloss reference data source_type : str or None Force processing using the specified source type. Returns ------- correction : `~stdatamodels.jwst.datamodels.SlitModel` The correction arrays """ # IFU targets are always inside slit # Get centering xcenter, ycenter = get_center(data.meta.exposure.type, None) # Calculate the 1-d wavelength and pathloss vectors for the source position aperture = pathloss.apertures[0] (wavelength_pointsource, pathloss_pointsource_vector, _) = calculate_pathloss_vector( aperture.pointsource_data, aperture.pointsource_wcs, xcenter, ycenter ) (wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector( aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter ) # Wavelengths in the reference file are in meters; # need them to be in microns wavelength_pointsource *= 1.0e6 wavelength_uniformsource *= 1.0e6 # Create the 2-d wavelength arrays, initialize with NaNs wavelength_array = np.zeros(data.shape, dtype=np.float32) wavelength_array.fill(np.nan) for this_slice in NIRSPEC_IFU_SLICES: slice_wcs = nirspec.nrs_wcs_set_input(data, this_slice) x, y = wcstools.grid_from_bounding_box(slice_wcs.bounding_box) ra, dec, wavelength = slice_wcs(x, y) valid = ~np.isnan(wavelength) x = x[valid] y = y[valid] wavelength_array[y.astype(int), x.astype(int)] = wavelength[valid] # Compute the point source pathloss 2D correction pathloss_2d_ps = interpolate_onto_grid( wavelength_array, wavelength_pointsource, pathloss_pointsource_vector ) # Compute the uniform source pathloss 2D correction pathloss_2d_un = interpolate_onto_grid( wavelength_array, wavelength_uniformsource, pathloss_uniform_vector ) # Use the appropriate correction for the source type if is_pointsource(source_type or data.meta.target.source_type): pathloss_2d = pathloss_2d_ps correction_type = "POINT" else: pathloss_2d = pathloss_2d_un correction_type = "UNIFORM" # Save the corrections. The `data` portion is the correction used. # The individual ones will be saved in the respective attributes. correction = type(data)(data=pathloss_2d) correction.pathloss_point = pathloss_2d_ps correction.pathloss_uniform = pathloss_2d_un correction.wavelength = wavelength_array correction.pathloss_correction_type = correction_type return correction def _corrections_for_lrs(data, pathloss, user_slit_loc): """ Calculate the correction arrays for MIRI LRS slit. Parameters ---------- data : `~stdatamodels.jwst.datamodels.JwstDataModel` The LRS data being operated on. pathloss : `~stdatamodels.jwst.datamodels.MirLrsPathlossModel` The pathloss reference data user_slit_loc : float User-provided slit location in units of arcsec, where ``(0, 0)`` is the center and the edges are +/-0.255 arcsec. Returns ------- correction : `~stdatamodels.jwst.datamodels.JwstDataModel` The correction arrays """ # Get location of target xcenter, ycenter, offset_1, offset_2 = get_center(data.meta.exposure.type, data, offsets=True) log.info(f"Target location relative to aperture center = ({xcenter:.3f}, {ycenter:.3f})") # Get 1-d wavelength vector from reference file data wavelength_vector = pathloss.pathloss_table["wavelength"] # Calculate the 1-d pathloss vector for the source position pathloss_data = pathloss.pathloss_table["pathloss"] pathloss_wcs = pathloss.meta.wcsinfo if user_slit_loc is None: _, pathloss_vector, is_inside_slit = calculate_pathloss_vector( pathloss_data, pathloss_wcs, xcenter, ycenter, calc_wave=False ) else: log.info(f"Correcting location from provided target center offset: {user_slit_loc} arcsec") # The slit is oriented with the long axis (the spatial # axis) horizontal, so the edges in the dispersion direction (the # narrow axis) would be negative down and positive up. Because the # slit is only 510 mas across, the edges would be at about # +/-0.255 arcsec. Hence, the xcenter coordinate remains the same. ra, dec, wav = data.meta.wcs(offset_1, offset_2) location = (ra, dec, wav) # Compute the spatial pixel scale from the WCS. Assume pixels are square. scale_degrees = compute_scale( data.meta.wcs, location, disp_axis=data.meta.wcsinfo.dispersion_direction ) scale_arcsec = scale_degrees * 3600.0 user_slit_loc_pix = user_slit_loc / scale_arcsec yusr_recenter = ycenter + user_slit_loc_pix log.info(f"New target location = ({xcenter:.3f}, {yusr_recenter:.3f})") _, pathloss_vector, is_inside_slit = calculate_pathloss_vector( pathloss_data, pathloss_wcs, xcenter, yusr_recenter, calc_wave=False ) if not is_inside_slit: log.info("Source is outside slit. Correction defaulting to center of the slit.") xcenter, ycenter = 0.0, 0.0 _, pathloss_vector, is_inside_slit = calculate_pathloss_vector( pathloss_data, pathloss_wcs, xcenter, ycenter, calc_wave=False ) # Populate 2-D wavelength array from WCS info wavelength_array = get_wavelengths(data) # MIRI LRS pathloss reference file data are in reverse order, # so flip them here wavelength_vector = wavelength_vector[::-1] pathloss_vector = pathloss_vector[::-1] # Compute the point source pathloss 2D correction pathloss_2d = interpolate_onto_grid(wavelength_array, wavelength_vector, pathloss_vector) # Save the corrections. The `data` portion is the correction used. # The individual ones will be saved in the respective attributes. correction = datamodels.ImageModel(data=pathloss_2d) correction.pathloss_point = pathloss_2d correction.wavelength = wavelength_array return correction