Source code for jwst.extract_1d.extract

import json
import logging
from json.decoder import JSONDecodeError
from pathlib import Path

import numpy as np
from astropy.modeling import polynomial
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels.apcorr import (
    MirLrsApcorrModel,
    MirMrsApcorrModel,
    NisWfssApcorrModel,
    NrcWfssApcorrModel,
    NrsFsApcorrModel,
    NrsIfuApcorrModel,
    NrsMosApcorrModel,
)

from jwst.datamodels import ModelContainer
from jwst.datamodels.utils import attrs_to_group_id
from jwst.datamodels.utils.tso_multispec import make_tso_specmodel
from jwst.extract_1d import extract1d, spec_wcs
from jwst.extract_1d.apply_apcorr import select_apcorr
from jwst.extract_1d.psf_profile import psf_profile
from jwst.extract_1d.source_location import location_from_wcs
from jwst.lib import pipe_utils
from jwst.lib.wcs_utils import get_wavelengths

__all__ = [
    "run_extract1d",
    "read_extract1d_ref",
    "read_apcorr_ref",
    "get_extract_parameters",
    "box_profile",
    "aperture_center",
    "shift_by_offset",
    "define_aperture",
    "extract_one_slit",
    "create_extraction",
]


log = logging.getLogger(__name__)

WFSS_EXPTYPES = ["NIS_WFSS", "NRC_WFSS", "NRC_GRISM"]
"""Exposure types to be regarded as wide-field slitless spectroscopy."""

SRCPOS_EXPTYPES = ["MIR_LRS-FIXEDSLIT", "NRS_FIXEDSLIT", "NRS_MSASPEC", "NRS_BRIGHTOBJ"]
"""Exposure types for which source position can be estimated."""

ANY = "ANY"
"""Wildcard for slit name.

Extended summary
----------------
For full-frame input data, keyword SLTNAME may not be populated, so the
slit name will be set to this string to indicate that the first slit in
the reference file should be used.
A slit name in the extract1d reference file can also be ANY, in which case the
reference information for that slit will be regarded as matching any slit
name from the input data.
"""

HORIZONTAL = 1
"""Horizontal dispersion axis."""
VERTICAL = 2
"""Vertical dispersion axis."""

# These values are assigned in get_extract_parameters, using key "match".
# If there was an aperture in the reference file for which the "id" key matched,
# that's (at least) a partial match.
# If "spectral_order" also matched, that's an exact match.
NO_MATCH = "no match"
PARTIAL = "partial match"
EXACT = "exact match"


class ContinueError(Exception):
    """Custom error to pass continue from a function inside a loop."""

    pass


[docs] def read_extract1d_ref(refname): """ Read the extract1d reference file. Parameters ---------- refname : str The name of the extract1d reference file, or 'N/A'. If specified, this file is expected to be a JSON file giving extraction information. Returns ------- ref_dict : dict or None If the extract1d reference file is specified, ref_dict will be the dictionary returned by json.load(). """ refname_type = refname[-4:].lower() if refname == "N/A": ref_dict = None else: # If specified, the extract1d reference file can only be in json format if refname_type == "json": fd = Path(refname).open() try: ref_dict = json.load(fd) fd.close() except (UnicodeDecodeError, JSONDecodeError): # Input file does not load correctly as json file. # Probably an error in json file fd.close() log.error( "Extract1d JSON reference file has an error. " "Run a json validator off line and fix the file." ) raise RuntimeError("Invalid JSON extract1d reference file.") from None else: log.error("Invalid Extract1d reference file: must be JSON.") raise RuntimeError("Invalid extract1d reference file: must be JSON.") return ref_dict
[docs] def read_apcorr_ref(refname, exptype): """ Read the apcorr reference file. Determine the appropriate DataModel class to use for an APCORR reference file and read the file into it. Parameters ---------- refname : str Path to the APCORR reference file. exptype : str EXP_TYPE of the input to the extract_1d step. Returns ------- DataModel A datamodel containing the reference file input. """ apcorr_model_map = { "MIR_LRS-FIXEDSLIT": MirLrsApcorrModel, "MIR_LRS-SLITLESS": MirLrsApcorrModel, "MIR_MRS": MirMrsApcorrModel, "NRC_GRISM": NrcWfssApcorrModel, "NRC_WFSS": NrcWfssApcorrModel, "NIS_WFSS": NisWfssApcorrModel, "NRS_BRIGHTOBJ": NrsFsApcorrModel, "NRS_FIXEDSLIT": NrsFsApcorrModel, "NRS_IFU": NrsIfuApcorrModel, "NRS_MSASPEC": NrsMosApcorrModel, } apcorr_model = apcorr_model_map[exptype] return apcorr_model(refname)
[docs] def get_extract_parameters( ref_dict, input_model, slitname, sp_order, meta, smoothing_length=None, bkg_fit=None, bkg_order=None, subtract_background=None, use_source_posn=None, position_offset=0.0, model_nod_pair=False, optimize_psf_location=False, extraction_type="box", psf_ref_name="N/A", ): """ Get extraction parameter values. Parameters ---------- ref_dict : dict or None For an extract1d reference file in JSON format, ``ref_dict`` will be the entire contents of the file. If there is no extract1d reference file, ``ref_dict`` will be None. input_model : JWSTDataModel This can be either the input science file or one SlitModel out of a list of slits. slitname : str The name of the slit, or "ANY". sp_order : int The spectral order number. meta : ObjectNode The metadata for the actual input model, i.e. not just for the current slit, from input_model.meta. smoothing_length : int or None, optional Width of a boxcar function for smoothing the background regions. If None, the smoothing length will be retrieved from ``ref_dict``, or it will be set to 0 (no background smoothing) if this key is not found in ``ref_dict``. If ``smoothing_length`` is not None, that means that the user explicitly specified the value, so that value will be used. This argument is only used if background regions have been specified. bkg_fit : str or None, optional The type of fit to apply to background values in each column (or row, if the dispersion is vertical). The default 'poly' results in a polynomial fit of order ``bkg_order``. Other options are 'mean' and 'median'. If 'mean' or 'median' is selected, ``bkg_order`` is ignored. bkg_order : int or None, optional Polynomial order for fitting to each column (or row, if the dispersion is vertical) of background. If None, the polynomial order will be retrieved from ``ref_dict``, or it will be set to 0 if not found in ``ref_dict``. A value of 0 means that a simple average of the background regions, column by column (or row by row), will be used. If ``bkg_order`` is not None, that means that the user explicitly specified the value, so that value will be used. This argument must be positive or zero, and it is only used if background regions have been specified. subtract_background : bool or None, optional If False, all background parameters will be ignored. use_source_posn : bool or None, optional If True, the target and background positions specified in ``ref_dict`` (or a default target position) will be shifted to account for the actual source location in the data. If None, a default value will be set, based on the exposure type. position_offset : float or None, optional Pixel offset to apply to the nominal source location. If None, the value specified in ``ref_dict`` will be used or it will default to 0. model_nod_pair : bool, optional If True, and if ``extraction_type`` is 'optimal', then a negative trace from nod subtraction will be modeled alongside the positive source, if possible. optimize_psf_location : bool If True, and if ``extraction_type`` is 'optimal', then the source location will be optimized, via iterative comparisons of the scene model with the input data. extraction_type : str, optional Extraction type ('box' or 'optimal'). Optimal extraction is only available if ``psf_ref_name`` is not 'N/A'. If set to None, optimal extraction will be used if ``use_source_posn`` is True. psf_ref_name : str, optional The name of the PSF reference file, or "N/A". Returns ------- extract_params : dict Information copied from ``ref_dict``. The items will be selected based on ``slitname`` and ``sp_order``. Default values will be assigned if ``ref_dict`` is None. """ extract_params = {"match": NO_MATCH} # initial value if ref_dict is None: # There is no extract1d reference file; use "reasonable" default values. extract_params["match"] = EXACT shape = input_model.data.shape extract_params["spectral_order"] = sp_order extract_params["xstart"] = 0 # first pixel in X extract_params["xstop"] = shape[-1] - 1 # last pixel in X extract_params["ystart"] = 0 # first pixel in Y extract_params["ystop"] = shape[-2] - 1 # last pixel in Y extract_params["extract_width"] = None extract_params["src_coeff"] = None extract_params["bkg_coeff"] = None # no background sub. extract_params["smoothing_length"] = 0 # because no background sub. extract_params["bkg_fit"] = None # because no background sub. extract_params["bkg_order"] = 0 # because no background sub. extract_params["subtract_background"] = False extract_params["extraction_type"] = "box" extract_params["use_source_posn"] = False # no source position correction extract_params["position_offset"] = 0.0 extract_params["model_nod_pair"] = False extract_params["optimize_psf_location"] = False extract_params["psf"] = "N/A" extract_params["independent_var"] = "pixel" extract_params["trace"] = None # Note that extract_params['dispaxis'] is not assigned. # This will be done later, possibly slit by slit. else: for aper in ref_dict["apertures"]: if ( "id" in aper and aper["id"] != "dummy" and (aper["id"] == slitname or aper["id"] == ANY or slitname == ANY) ): # region_type is retained for backward compatibility; it is # not required to be present, but if it is and is not set # to target, then the aperture is not matched. region_type = aper.get("region_type", "target") if region_type != "target": continue extract_params["match"] = PARTIAL # spectral_order is a secondary selection criterion. The # default is the expected value, so if the key is not present # in the JSON file, the current aperture will be selected. # If the current aperture in the JSON file has # "spectral_order": "ANY", that aperture will be selected. spectral_order = aper.get("spectral_order", sp_order) if spectral_order == sp_order or spectral_order == ANY: extract_params["match"] = EXACT extract_params["spectral_order"] = sp_order # Note: extract_params['dispaxis'] is not assigned. # This is done later, possibly slit by slit. # Set default start/stop by shape if not specified shape = input_model.data.shape extract_params["xstart"] = aper.get("xstart", 0) extract_params["xstop"] = aper.get("xstop", shape[-1] - 1) extract_params["ystart"] = aper.get("ystart", 0) extract_params["ystop"] = aper.get("ystop", shape[-2] - 1) extract_params["src_coeff"] = aper.get("src_coeff") extract_params["bkg_coeff"] = aper.get("bkg_coeff") if extract_params["bkg_coeff"] is not None and subtract_background is None: subtract_background = True if subtract_background: extract_params["subtract_background"] = True if bkg_fit is not None: # Mean value for background fitting is equivalent # to a polynomial fit with order 0. if bkg_fit == "mean": bkg_fit = "poly" bkg_order = 0 extract_params["bkg_fit"] = bkg_fit else: extract_params["bkg_fit"] = aper.get("bkg_fit", "poly") else: extract_params["subtract_background"] = False extract_params["bkg_fit"] = None extract_params["independent_var"] = aper.get("independent_var", "pixel").lower() if bkg_order is None: extract_params["bkg_order"] = aper.get("bkg_order", 0) else: # If the user supplied a value, use that value. extract_params["bkg_order"] = bkg_order # Set use_source_posn based on hierarchy of priorities: # parameter value on the command line is highest precedence, # then parameter value from the extract1d reference file, # and finally a default setting based on exposure type. # value from the ref file use_source_posn_aper = aper.get("use_source_posn", None) if use_source_posn is None: # no value set on command line if use_source_posn_aper is None: # no value set in ref file - use a suitable default if meta.exposure.type in SRCPOS_EXPTYPES: use_source_posn = True log.info( f"Turning on source position correction " f"for exp_type = {meta.exposure.type}" ) else: use_source_posn = False else: # use the value from the ref file use_source_posn = use_source_posn_aper extract_params["use_source_posn"] = use_source_posn extract_params["position_offset"] = position_offset extract_params["trace"] = None extract_params["extract_width"] = aper.get("extract_width") if smoothing_length is None: extract_params["smoothing_length"] = aper.get("smoothing_length", 0) else: # If the user supplied a value, use that value. extract_params["smoothing_length"] = smoothing_length # Check that the smoothing length has a reasonable value sm_length = extract_params["smoothing_length"] if sm_length != int(sm_length): sm_length = int(np.round(sm_length)) log.warning(f"Smoothing length must be an integer. Rounding to {sm_length}") if sm_length != 0: if sm_length < 3: log.warning( f"Smoothing length {sm_length} is not allowed. Setting it to 0." ) sm_length = 0 elif sm_length % 2 == 0: sm_length -= 1 log.warning( "Even smoothing lengths are not supported. " "Rounding the smoothing length down to " f"{sm_length}." ) extract_params["smoothing_length"] = sm_length extract_params["psf"] = psf_ref_name extract_params["optimize_psf_location"] = optimize_psf_location extract_params["model_nod_pair"] = model_nod_pair # Check for a valid PSF file extraction_type = str(extraction_type).lower() if extract_params["psf"] == "N/A" and extraction_type != "box": log.warning("No PSF file available. Setting extraction type to 'box'.") extraction_type = "box" # Set the extraction type to 'box' or 'optimal' if extraction_type == "none": if extract_params["use_source_posn"]: extract_params["extraction_type"] = "optimal" else: extract_params["extraction_type"] = "box" log.info( f"Using extraction type '{extract_params['extraction_type']}' " f"for use_source_posn = {extract_params['use_source_posn']}" ) else: extract_params["extraction_type"] = extraction_type break return extract_params
def log_initial_parameters(extract_params): """ Log the initial extraction parameters. Parameters ---------- extract_params : dict Information read from the reference file. """ if "dispaxis" not in extract_params: return log.debug("Extraction parameters:") skip_keys = {"match", "trace"} for key, value in extract_params.items(): if key not in skip_keys: log.debug(f" {key} = {value}") def create_poly(coeff): """ Create a polynomial model from coefficients. Parameters ---------- coeff : list of float The coefficients of the polynomial, constant term first, highest order term last. Returns ------- `astropy.modeling.polynomial.Polynomial1D` or None None is returned if ``coeff`` is empty. """ n = len(coeff) if n < 1: return None coeff_dict = {f"c{i}": coeff[i] for i in range(n)} return polynomial.Polynomial1D(degree=n - 1, **coeff_dict) def populate_time_keywords(input_model, output_model): """ Copy the integration time keywords to header keywords. Parameters ---------- input_model : TSOMultiSpecModel The input science model. output_model : TSOMultiSpecModel The output science model. This may be modified in-place. """ nints = input_model.meta.exposure.nints int_start = input_model.meta.exposure.integration_start if hasattr(input_model, "data"): shape = input_model.data.shape if len(shape) == 2: num_integ = 1 else: # len(shape) == 3 num_integ = shape[0] else: # e.g. MultiSlit data num_integ = 1 # This assumes that the spec attribute of output_model has already been created, # and spectra have been appended to the spec_table n_output_spec = 0 for spec in output_model.spec: n_output_spec += len(spec.spec_table) # num_j is the number of spectral tables, e.g. the number # of fixed-slit spectra, MSA spectra, or different # spectral orders; num_integ is the number of integrations. # The total number of output spectra is n_output_spec = num_integ * num_j num_j = len(output_model.spec) # check that the number of spectra present matches expectations if n_output_spec != num_j * num_integ: log.warning( f"populate_time_keywords: Don't understand n_output_spec = {n_output_spec}, " f"num_j = {num_j}, num_integ = {num_integ}" ) else: log.debug( f"Number of output spectra = {n_output_spec}; " f"number of spectra for each integration = {num_j}; " f"number of integrations = {num_integ}" ) if int_start is None: log.warning("INTSTART not found; assuming a value of 1.") int_start = 1 int_start -= 1 # zero indexed int_end = input_model.meta.exposure.integration_end if int_end is None: log.warning(f"INTEND not found; assuming a value of {nints}.") int_end = nints int_end -= 1 # zero indexed if nints > 1: num_integrations = int_end - int_start + 1 else: num_integrations = 1 if hasattr(input_model, "int_times") and input_model.int_times is not None: nrows = len(input_model.int_times) else: nrows = 0 if nrows < 1: log.warning( "There is no INT_TIMES table in the input file - " "Making best guess on integration numbers." ) # Set a simple integration index for spec in output_model.spec: spec.spec_table["INT_NUM"] = np.arange(1, len(spec.spec_table) + 1) return # If we have a single plane (e.g. ImageModel or MultiSlitModel), # we will only populate the keywords if the corresponding uncal file # had one integration. # If the data were or might have been segmented, we use the first and # last integration numbers to determine whether the data were in fact # averaged over integrations, and if so, we should not populate the # int_times-related header keywords. skip = False # initial value if isinstance(input_model, (datamodels.MultiSlitModel, datamodels.ImageModel)): if num_integrations > 1: log.warning( "Not using INT_TIMES table because the data have been averaged over integrations." ) skip = True elif isinstance(input_model, (datamodels.CubeModel, datamodels.SlitModel)): shape = input_model.data.shape if len(shape) == 2 and num_integrations > 1: log.warning( "Not using INT_TIMES table because the data have been averaged over integrations." ) skip = True elif len(shape) != 3 or shape[0] > nrows: # Later, we'll check that the integration_number column actually # has a row corresponding to every integration in the input. log.warning( "Not using INT_TIMES table because the data shape is " "not consistent with the number of table rows." ) skip = True elif isinstance(input_model, datamodels.IFUCubeModel): log.warning("The INT_TIMES table will be ignored for IFU data.") skip = True if skip: return int_num = input_model.int_times["integration_number"] start_time_mjd = input_model.int_times["int_start_MJD_UTC"] mid_time_mjd = input_model.int_times["int_mid_MJD_UTC"] end_time_mjd = input_model.int_times["int_end_MJD_UTC"] start_tdb = input_model.int_times["int_start_BJD_TDB"] mid_tdb = input_model.int_times["int_mid_BJD_TDB"] end_tdb = input_model.int_times["int_end_BJD_TDB"] # Inclusive range of integration numbers in the input data, zero indexed. data_range = (int_start, int_end) # Inclusive range of integration numbers in the INT_TIMES table, zero indexed. table_range = (int_num[0] - 1, int_num[-1] - 1) offset = data_range[0] - table_range[0] data_range_test = data_range[0] < table_range[0] or data_range[1] > table_range[1] if data_range_test or offset > table_range[1]: log.warning( "Not using the INT_TIMES table because it does not include " "rows for all integrations in the data." ) return log.debug("TSO data, so copying times from the INT_TIMES table.") for j in range(num_j): # for each slit or order (EXTRACT1D extension) spec = output_model.spec[j] spec.time_scale = "UTC" for k in range(len(spec.spec_table)): # for each integration (table row) row = k + offset spec.spec_table["INT_NUM"][k] = int_num[row] spec.spec_table["MJD-BEG"][k] = start_time_mjd[row] spec.spec_table["MJD-AVG"][k] = mid_time_mjd[row] spec.spec_table["MJD-END"][k] = end_time_mjd[row] spec.spec_table["TDB-BEG"][k] = start_tdb[row] spec.spec_table["TDB-MID"][k] = mid_tdb[row] spec.spec_table["TDB-END"][k] = end_tdb[row] def get_spectral_order(slit): """ Get the spectral order number. Parameters ---------- slit : SlitModel One slit from an input MultiSlitModel or similar. Returns ------- int Spectral order number for ``slit``. If no information about spectral order is available in ``wcsinfo``, a default value of 1 will be returned. """ sp_order = slit.meta.wcsinfo.spectral_order if sp_order is None: log.warning("spectral_order is None; using 1") sp_order = 1 return sp_order def is_prism(input_model): """ Determine whether the current observing mode used a prism. The reason for this test is so we can skip spectral extraction if the spectral order is zero and the exposure was not made using a prism. In this context, therefore, a grism is not considered to be a prism. Parameters ---------- input_model : JWSTDataModel The input science model. Returns ------- bool True if the exposure used a prism; False otherwise. """ instrument = input_model.meta.instrument.name if instrument is None: return False instrument_filter = input_model.meta.instrument.filter if instrument_filter is None: instrument_filter = "NONE" else: instrument_filter = instrument_filter.upper() grating = input_model.meta.instrument.grating if grating is None: grating = "NONE" else: grating = grating.upper() prism_mode = False if (instrument == "MIRI" and instrument_filter.find("P750L") >= 0) or ( instrument == "NIRSPEC" and grating.find("PRISM") >= 0 ): prism_mode = True return prism_mode def copy_keyword_info(slit, slitname, spec): """ Copy metadata from the input to the output spectrum. Parameters ---------- slit : A SlitModel object Metadata will be copied from the input ``slit`` to output ``spec``. slitname : str or None The name of the slit. spec : One element of MultiSpecModel.spec Metadata attributes will be updated in-place. """ if slitname is not None and slitname != "ANY": spec.name = slitname # Copy over some attributes if present, even if they are None copy_attributes = [ "slitlet_id", "source_id", "source_xpos", "source_ypos", "source_ra", "source_dec", "shutter_state", ] for key in copy_attributes: if hasattr(slit, key): setattr(spec, key, getattr(slit, key)) # Copy over some attributes only if they are present and not None copy_populated_attributes = [ "source_name", "source_alias", "source_type", "stellarity", "quadrant", "slit_xscale", "slit_yscale", "wavelength_corrected", "pathloss_correction_type", "barshadow_corrected", ] for key in copy_populated_attributes: if getattr(slit, key, None) is not None: setattr(spec, key, getattr(slit, key)) def _set_weight_from_limits(profile, idx, lower_limit, upper_limit, allow_partial=True): """ Set profile pixel weighting from a lower and upper limit. Pixels to be fully included in the aperture are set to 1.0. Pixels partially included are set to fractional values. The profile is updated in place, so aperture setting is cumulative. If there are overlapping apertures specified, later ones will overwrite earlier values. Parameters ---------- profile : ndarray of float The spatial profile to update. idx : ndarray of int Index values for the profile array, corresponding to the cross-dispersion axis. Dimensions must match ``profile`` shape. lower_limit : float or ndarray of float Lower limit for the aperture. If not a single value, dimensions must match ``profile`` shape. upper_limit : float or ndarray of float Upper limit for the aperture. If not a single value, dimensions must match ``profile`` shape. allow_partial : bool, optional If True, partial pixel weights are set where the pixel index intersects the limit values. If False, only whole integer weights are set. """ # Both limits are inclusive profile[(idx >= lower_limit) & (idx <= upper_limit)] = 1.0 if allow_partial: # For each pixel, get the distance from the lower and upper limits, # to set partial pixel weights to the fraction of the pixel included. # For example, if the lower limit is 1.7, then the weight of pixel 1 # should be set to 0.3. If the upper limit is 10.7, then the weight of # pixel 11 should be set to 0.7. lower_weight = idx - lower_limit + 1 upper_weight = upper_limit - idx + 1 for partial_pixel_weight in [lower_weight, upper_weight]: # Check for overlap values that are between 0 and 1, for which # the profile does not already contain a higher weight test = ( (partial_pixel_weight > 0) & (partial_pixel_weight < 1) & (profile < partial_pixel_weight) ) # Set these values to the partial pixel weight profile[test] = partial_pixel_weight[test]
[docs] def box_profile( shape, extract_params, wl_array, coefficients="src_coeff", label="aperture", return_limits=False ): """ Create a spatial profile for box extraction. The output profile is an image matching the input data shape, containing weights for each pixel in the image. Pixels to be fully included in an extraction aperture are set to 1.0. Pixels partially included are set to values between 0 and 1, depending on the overlap with the aperture limits. Pixels not included in the aperture are set to 0.0. Upper and lower limits for the aperture are determined from the ``extract_params``, in this priority order: 1. src_coeff upper and lower limits (or bkg_coeff, for a background profile) 2. trace +/- extraction width / 2 3. center of start/stop values +/- extraction width / 2 4. cross-dispersion start/stop values 5. array limits. Left and right limits are set from start/stop values only. Only a single aperture is supported at this time. The 'src_coeff' and 'bkg_coeff' specifications allow multiple regions to be specified, but all regions are considered part of the same aperture and are added to the same profile, to be extracted at once. Parameters ---------- shape : tuple of int Data shape for the output profile, to match the spectral image. extract_params : dict Extraction parameters, as returned from `get_extract_parameters`. wl_array : ndarray Array of wavelength values, matching ``shape``, for each pixel in the array. coefficients : {'src_coeff', 'bkg_coeff'}, optional The polynomial coefficients to look for in the ``extract_params`` dictionary. If 'bkg_coeff', the output aperture contains background regions; otherwise, it contains target source regions. label : str, optional A label to use for the aperture, while logging limit values. return_limits : bool, optional If True, an upper and lower limit value for the aperture are returned along with the spatial profile. These are used for recording the aperture extent in output metadata. For apertures set from polynomial coefficients, the returned values are averages of the true upper and lower limits, which may vary by dispersion element. Returns ------- profile : ndarray of float Aperture weights to use in box extraction from a spectral image. lower_limit : float, optional Average lower limit for the aperture. Returned only if ``return_limits`` is set. upper_limit : float, optional Average upper limit for the aperture. Returned only if ``return_limits`` is set. """ # Get pixel index values for the array yidx, xidx = np.mgrid[: shape[0], : shape[1]] if extract_params["dispaxis"] == HORIZONTAL: dval = yidx else: dval = xidx # Get start/stop values from parameters if present, # or default to data shape xstart = extract_params.get("xstart", 0) xstop = extract_params.get("xstop", shape[1] - 1) ystart = extract_params.get("ystart", 0) ystop = extract_params.get("ystop", shape[0] - 1) # Check if the profile should contain partial pixel weights if coefficients == "bkg_coeff" and extract_params["bkg_fit"] == "median": allow_partial = False else: allow_partial = True # Set aperture region, in this priority order: # 1. src_coeff upper and lower limits (or bkg_coeff, for background profile) # 2. trace +/- extraction width / 2 # 3. center of start/stop values +/- extraction width / 2 # 4. start/stop values profile = np.full(shape, 0.0) if extract_params[coefficients] is not None: # Limits from source coefficients: ignore ystart/stop/width if extract_params["independent_var"].startswith("wavelength"): ival = wl_array elif extract_params["dispaxis"] == HORIZONTAL: ival = xidx.astype(np.float32) else: ival = yidx.astype(np.float32) # The source extraction can include more than one region, # but must contain pairs of lower and upper limits. n_src_coeff = len(extract_params[coefficients]) if n_src_coeff % 2 != 0: raise RuntimeError( f"{coefficients} must contain alternating lists of lower and upper limits." ) lower = None lower_limit = None upper_limit = None for i, coeff_list in enumerate(extract_params[coefficients]): if i % 2 == 0: lower = create_poly(coeff_list) else: upper = create_poly(coeff_list) # NOTE: source coefficients currently have a different # definition for pixel inclusion than the start/stop input. # Source coefficients define 0 at the center of the pixel, # start/stop defines 0 at the start of the lower pixel and # includes the upper pixel. Here, we are setting limits # in the spatial profile according to the more commonly used # start/stop definition, so we need to modify the lower and # upper limits from polynomial coefficients to match. lower_limit_region = lower(ival) + 0.5 upper_limit_region = upper(ival) - 0.5 _set_weight_from_limits( profile, dval, lower_limit_region, upper_limit_region, allow_partial=allow_partial, ) mean_lower = np.mean(lower_limit_region) mean_upper = np.mean(upper_limit_region) log.info( f"Mean {label} start/stop from {coefficients}: " f"{mean_lower:.2f} -> {mean_upper:.2f} (inclusive)" ) if lower_limit is None: lower_limit = mean_lower upper_limit = mean_upper else: if mean_lower < lower_limit: lower_limit = mean_lower if mean_upper > upper_limit: upper_limit = mean_upper elif extract_params["extract_width"] is not None and extract_params["trace"] is not None: width = extract_params["extract_width"] trace = extract_params["trace"] if extract_params["dispaxis"] != HORIZONTAL: trace = np.tile(trace, (shape[1], 1)).T lower_limit_region = trace - (width - 1.0) / 2.0 upper_limit_region = lower_limit_region + width - 1 _set_weight_from_limits(profile, dval, lower_limit_region, upper_limit_region) lower_limit = np.nanmean(lower_limit_region) upper_limit = np.nanmean(upper_limit_region) log.info( f"Mean {label} start/stop from trace: " f"{lower_limit:.2f} -> {upper_limit:.2f} (inclusive)" ) elif extract_params["extract_width"] is not None: # Limits from extraction width at center of ystart/stop if present, # center of array if not if extract_params["dispaxis"] == HORIZONTAL: nominal_middle = (ystart + ystop) / 2.0 else: nominal_middle = (xstart + xstop) / 2.0 width = extract_params["extract_width"] lower_limit = nominal_middle - (width - 1.0) / 2.0 upper_limit = lower_limit + width - 1 _set_weight_from_limits(profile, dval, lower_limit, upper_limit) log.info( f"{label.capitalize()} start/stop: {lower_limit:.2f} -> {upper_limit:.2f} (inclusive)" ) else: # Limits from start/stop only, defaulting to the full array # if not specified if extract_params["dispaxis"] == HORIZONTAL: lower_limit = ystart upper_limit = ystop else: lower_limit = xstart upper_limit = xstop _set_weight_from_limits(profile, dval, lower_limit, upper_limit) log.info( f"{label.capitalize()} start/stop: {lower_limit:.2f} -> {upper_limit:.2f} (inclusive)" ) # Set weights to zero outside left and right limits if extract_params["dispaxis"] == HORIZONTAL: profile[:, : int(np.ceil(xstart))] = 0 profile[:, int(np.floor(xstop)) + 1 :] = 0 else: profile[: int(np.ceil(ystart)), :] = 0 profile[int(np.floor(ystop)) + 1 :, :] = 0 if return_limits: return profile, lower_limit, upper_limit return profile
[docs] def aperture_center(profile, dispaxis=1, middle_pix=None): """ Determine the nominal center of an aperture. The center is determined from a weighted average of the pixel coordinates, where the weights are set by the profile image. If ``middle_pix`` is specified, it is expected to be the dispersion element at which the cross-dispersion center should be determined. In this case, the profile must contain some non-zero elements at that dispersion coordinate. If ``dispaxis`` is 1 (the default), the return values are in (y,x) order. Otherwise, the return values are in (x,y) order. Parameters ---------- profile : ndarray Pixel weights for the aperture. dispaxis : int, optional If 1, the dispersion axis is horizontal. Otherwise, the dispersion axis is vertical. middle_pix : int or None Index value for the center of the slit along the dispersion axis. If specified, it is returned as ``spec_center``. Returns ------- slit_center : float Index value for the center of the aperture along the cross-dispersion axis. spec_center : float Index value for the center of the aperture along the dispersion axis. """ weights = profile.copy() weights[weights <= 0] = 0.0 yidx, xidx = np.mgrid[: profile.shape[0], : profile.shape[1]] if dispaxis == HORIZONTAL: spec_idx = xidx if middle_pix is not None: slit_idx = yidx[:, middle_pix] weights = weights[:, middle_pix] else: slit_idx = yidx else: spec_idx = yidx if middle_pix is not None: slit_idx = xidx[middle_pix, :] weights = weights[middle_pix, :] else: slit_idx = xidx if np.sum(weights) == 0: weights[:] = 1 slit_center = np.average(slit_idx, weights=weights) if middle_pix is not None: spec_center = middle_pix else: spec_center = np.average(spec_idx, weights=weights) # if dispaxis == 1 (default), this returns center_x, center_y return slit_center, spec_center
[docs] def shift_by_offset(offset, extract_params, update_trace=True): """ Shift the nominal extraction parameters by a pixel offset. Start, stop, and polynomial coefficient values for source and background are updated in place in the ``extract_params`` dictionary. The source trace value, if present, is also updated if desired. Parameters ---------- offset : float Cross-dispersion offset to apply, in pixels. extract_params : dict Extraction parameters to update, as created by `get_extraction_parameters`. update_trace : bool If True, the trace in ``extract_params['trace']`` is also updated if present. """ # Shift polynomial coefficients coeff_params = ["src_coeff", "bkg_coeff"] for params in coeff_params: if extract_params[params] is not None: for coeff_list in extract_params[params]: coeff_list[0] += offset # Shift start/stop values if extract_params["dispaxis"] == HORIZONTAL: start_stop_params = ["ystart", "ystop"] else: start_stop_params = ["xstart", "xstop"] for params in start_stop_params: if extract_params[params] is not None: extract_params[params] += offset # Shift the full trace if update_trace and extract_params["trace"] is not None: extract_params["trace"] += offset
[docs] def define_aperture(input_model, slit, extract_params, exp_type): """ Define an extraction aperture from input parameters. Parameters ---------- input_model : DataModel The input science model containing metadata information. slit : DataModel or None One slit from a MultiSlitModel (or similar), or None. The spectral image and WCS information will be retrieved from ``slit`` unless ``slit`` is None. In that case, they will be retrieved from ``input_model``. extract_params : dict Extraction parameters, as created by `get_extraction_parameters`. exp_type : str Exposure type for the input data. Returns ------- ra : float A representative RA value for the source centered in the aperture. dec : float A representative Dec value for the source centered in the aperture. wavelength : ndarray of float The 1D wavelength array, matching the dispersion dimension of the input spectral image, corresponding to the aperture. May contain NaNs for invalid dispersion elements. profile : ndarray of float A 2D image containing pixel weights for the extraction aperture, matching the dimensions of the input spectral image. Values are between 0.0 (pixel not included in the extraction aperture) and 1.0 (pixel fully included in the aperture). bg_profile : ndarray of float or None If background regions are specified in ``extract_params['bkg_coeff']``, and ``extract_params['subtract_background']`` is True, then ``bg_profile`` is a 2D image containing pixel weights for background regions, to be fit during extraction. Otherwise, ``bg_profile`` is None. nod_profile : ndarray of float or None For optimal extraction, if nod subtraction was performed, a second spatial profile is generated, modeling the negative source in the slit. This second spatial profile is returned in ``nod_profile`` if generated. Otherwise, ``nod_profile`` is None. limits : tuple of float Index limit values for the aperture, returned as (lower_limit, upper_limit, left_limit, right_limit). Upper/lower limits are along the cross-dispersion axis. Left/right limits are along the dispersion axis. All limits are inclusive and start at zero index value. """ if slit is None: data_model = input_model else: data_model = slit data_shape = data_model.data.shape[-2:] # Get a wavelength array for the data wl_array = get_wavelengths(data_model, exp_type, extract_params["spectral_order"]) # Shift aperture definitions by source position if needed # Extract parameters are updated in place if extract_params["use_source_posn"]: # Source location from WCS middle_pix, middle_wl, location, trace = location_from_wcs(input_model, slit) if location is not None: log.info( f"Computed source location is {location:.2f}, " f"at pixel {middle_pix}, wavelength {middle_wl:.2f}" ) # Nominal location from extract params + located middle nominal_profile = box_profile( data_shape, extract_params, wl_array, label="nominal aperture" ) nominal_location, _ = aperture_center( nominal_profile, extract_params["dispaxis"], middle_pix=middle_pix ) # Offset extract parameters by location - nominal offset = location - nominal_location log.info( f"Nominal location is {nominal_location:.2f}, so offset is {offset:.2f} pixels" ) shift_by_offset(offset, extract_params, update_trace=False) else: middle_pix, middle_wl, location, trace = None, None, None, None # Store the trace, if computed extract_params["trace"] = trace # Add an extra position offset if desired, from extract_params['position_offset'] offset = extract_params.get("position_offset", 0.0) if offset != 0.0: log.info(f"Applying additional cross-dispersion offset {offset:.2f} pixels") shift_by_offset(offset, extract_params, update_trace=True) # Make a spatial profile, including source shifts if necessary nod_profile = None if extract_params["extraction_type"] == "optimal": profiles, lower_limit, upper_limit = psf_profile( data_model, extract_params["trace"], wl_array, extract_params["psf"], model_nod_pair=extract_params["model_nod_pair"], optimize_shifts=extract_params["optimize_psf_location"], ) if len(profiles) > 1: profile, nod_profile = profiles else: profile = profiles[0] else: profile, lower_limit, upper_limit = box_profile( data_shape, extract_params, wl_array, return_limits=True ) # Make sure profile weights are zero where wavelengths are invalid profile[~np.isfinite(wl_array)] = 0.0 if nod_profile is not None: nod_profile[~np.isfinite(wl_array)] = 0.0 # Get the effective left and right limits from the profile weights nonzero_weight = np.where(np.sum(profile, axis=extract_params["dispaxis"] - 1) > 0)[0] if len(nonzero_weight) > 0: left_limit = nonzero_weight[0] right_limit = nonzero_weight[-1] else: left_limit = None right_limit = None # Make a background profile if necessary # (will also include source shifts) if extract_params["subtract_background"] and extract_params["bkg_coeff"] is not None: bg_profile = box_profile(data_shape, extract_params, wl_array, coefficients="bkg_coeff") else: bg_profile = None # Get 1D wavelength corresponding to the spatial profile mask = np.isnan(wl_array) | (profile <= 0) masked_wl = np.ma.masked_array(wl_array, mask=mask) masked_weights = np.ma.masked_array(profile, mask=mask) if extract_params["dispaxis"] == HORIZONTAL: wavelength = np.average(masked_wl, weights=masked_weights, axis=0).filled(np.nan) else: wavelength = np.average(masked_wl, weights=masked_weights, axis=1).filled(np.nan) # Get RA and Dec corresponding to the center of the array, # weighted by the spatial profile center_y, center_x = aperture_center(profile, 1) coords = data_model.meta.wcs(center_x, center_y) if np.any(np.isnan(coords)): ra = None dec = None else: ra = float(coords[0]) dec = float(coords[1]) # Return limits as a tuple with 4 elements: lower, upper, left, right limits = (lower_limit, upper_limit, left_limit, right_limit) return ra, dec, wavelength, profile, bg_profile, nod_profile, limits
[docs] def extract_one_slit(data_model, integration, profile, bg_profile, nod_profile, extract_params): """ Extract data for one slit, or spectral order, or integration. Parameters ---------- data_model : JWSTDataModel The input science model. May be a single slit from a MultiSlitModel (or similar), or a single data type, like an ImageModel, SlitModel, or CubeModel. integration : int For the case that data_model is a SlitModel or a CubeModel, ``integration`` is the integration number. If the integration number is not relevant (i.e. the data array is 2-D), ``integration`` should be -1. profile : ndarray of float Spatial profile indicating the aperture location. Must be a 2D image matching the input, with floating point values between 0 and 1 assigning a weight to each pixel. 0 means the pixel is not used, 1 means the pixel is fully included in the aperture. bg_profile : ndarray of float or None Background profile indicating any background regions to use, following the same format as the spatial profile. Ignored if extract_params['subtract_background'] is False. nod_profile : ndarray of float or None For optimal extraction, if nod subtraction was performed, a second spatial profile is generated, modeling the negative source in the slit. This second spatial profile may be passed in ``nod_profile`` for simultaneous fitting with the primary source in ``profile``. Otherwise, ``nod_profile`` should be None. extract_params : dict Parameters read from the extract1d reference file, as returned by `get_extract_parameters`. Returns ------- sum_flux : ndarray, 1-D, float64 The sum of the data values in the extraction region minus the sum of the data values in the background regions (scaled by the ratio of the numbers of pixels), for each pixel. The data values are usually in units of surface brightness, so this value isn't the flux, it's an intermediate value. Multiply ``sum_flux`` by the solid angle of a pixel to get the flux for a point source (column "flux"). Divide ``sum_flux`` by ``npixels`` (to compute the average) to get the array for the "surf_bright" (surface brightness) output column. f_var_rnoise : ndarray, 1-D The extracted read noise variance values to go along with the sum_flux array. f_var_poisson : ndarray, 1-D The extracted poisson variance values to go along with the sum_flux array. f_var_flat : ndarray, 1-D The extracted flat field variance values to go along with the sum_flux array. background : ndarray, 1-D The background count rate that was subtracted from the sum of the source data values to get ``sum_flux``. b_var_rnoise : ndarray, 1-D The extracted read noise variance values to go along with the background array. b_var_poisson : ndarray, 1-D The extracted poisson variance values to go along with the background array. b_var_flat : ndarray, 1-D The extracted flat field variance values to go along with the background array. npixels : ndarray, 1-D, float64 The number of pixels that were added together to get ``sum_flux``, including any fractional pixels included via non-integer weights in the input profile. scene_model : ndarray, 2-D, float64 A 2D model of the flux in the spectral image, corresponding to the extracted aperture. residual : ndarray, 2-D, float64 Residual image from the input minus the scene model. """ # Get the data and variance arrays if integration > -1: log.debug(f"Extracting integration {integration + 1}") data = data_model.data[integration] var_rnoise = data_model.var_rnoise[integration] var_poisson = data_model.var_poisson[integration] var_flat = data_model.var_flat[integration] else: data = data_model.data var_rnoise = data_model.var_rnoise var_poisson = data_model.var_poisson var_flat = data_model.var_flat # Make sure variances match data if var_rnoise.shape != data.shape: var_rnoise = np.zeros_like(data) if var_poisson.shape != data.shape: var_poisson = np.zeros_like(data) if var_flat.shape != data.shape: var_flat = np.zeros_like(data) # Transpose data for extraction if extract_params["dispaxis"] == HORIZONTAL: bg_profile_view = bg_profile if nod_profile is not None: profiles = [profile, nod_profile] else: profiles = [profile] else: data = data.T var_rnoise = var_rnoise.T var_poisson = var_poisson.T var_flat = var_flat.T if bg_profile is not None: bg_profile_view = bg_profile.T else: bg_profile_view = None if nod_profile is not None: profiles = [profile.T, nod_profile.T] else: profiles = [profile.T] # Extract spectra from the data result = extract1d.extract1d( data, profiles, var_rnoise, var_poisson, var_flat, profile_bg=bg_profile_view, bg_smooth_length=extract_params["smoothing_length"], fit_bkg=extract_params["subtract_background"], bkg_fit_type=extract_params["bkg_fit"], bkg_order=extract_params["bkg_order"], extraction_type=extract_params["extraction_type"], ) # Extraction routine can return multiple spectra; # here, we just want the first result first_result = [] for r in result[:-1]: first_result.append(r[0]) # The last return value is the 2D model - there is only one, regardless # of the number of input profiles. It may need to be transposed to match # the input data. scene_model = result[-1] residual = data - scene_model if extract_params["dispaxis"] == HORIZONTAL: first_result.append(scene_model) first_result.append(residual) else: first_result.append(scene_model.T) first_result.append(residual.T) return first_result
[docs] def create_extraction( input_model, slit, output_model, extract_ref_dict, slitname, sp_order, exp_type, apcorr_ref_model=None, log_increment=50, save_profile=False, save_scene_model=False, save_residual_image=False, **kwargs, ): """ Extract spectra from an input model and append to an output model. Input data, specified in the ``slit`` or ``input_model``, should contain data with consistent wavelengths, target positions, and spectral image cutouts, suitable for extraction with a shared aperture. Extraction parameters are determined collectively, then multiple integrations, if present, are each extracted separately. The output model must be a `MultiSpecModel` or `TSOMultiSpecModel`, created before calling this function, and passed as ``output_model``. It is updated in place, with new spectral tables appended as they are created. The process is: 1. Retrieve extraction parameters from ``extract_ref_dict`` and set defaults for any missing values as needed. 2. Define an extraction aperture from the input parameters, as well as the nominal RA, Dec, and 1D wavelength arrays for the output spectrum. 3. Set up an aperture correction to apply to each spectrum, if ``apcorr_ref_model`` is not None. 4. Loop over integrations to extract all spectra. For each integration, the extraction process is: 1. Extract summed flux and variance values from the aperture. 2. Compute an average value (from flux / npixels), to be stored as the surface brightness. 3. Convert the summed flux to flux density (Jy). 4. Compute an error spectrum from the square root of the sum of the variance components. 5. Set a DQ array, with DO_NOT_USE flags set where the flux is NaN. 6. Create a spectral table to contain all extracted values and store it in a `SpecModel`. 7. Apply the aperture correction to the spectral table, if available. 8. Append the new SpecModel to the output_model. Parameters ---------- input_model : DataModel Top-level datamodel containing metadata for the input data. If slit is not specified, ``input_model`` must also contain the spectral image(s) to extract, in the data attribute. slit : SlitModel or None One slit from an input MultiSlitModel or similar. If not None, ``slit`` must contain the spectral image(s) to extract, in the data attribute, along with appropriate WCS metadata. output_model : MultiSpecModel, TSOMultiSpecModel The output model to append spectral tables to. extract_ref_dict : dict or None Extraction parameters read in from the extract_1d reference file, or None, if there was no reference file. slitname : str Slit name for the input data. sp_order : int Spectral order for the input data. exp_type : str Exposure type for the input data. apcorr_ref_model : DataModel or None, optional The aperture correction reference datamodel, containing the APCORR reference file data. log_increment : int, optional If greater than 0 and the input data are multi-integration, a message will be written to the log every ``log_increment`` integrations. save_profile : bool, optional If True, the spatial profile created for the aperture will be returned as an ImageModel. If False, the return value is None. save_scene_model : bool, optional If True, the flux model created during extraction will be returned as an ImageModel or CubeModel. If False, the return value is None. save_residual_image : bool, optional If True, the residual image (from input minus scene model) will be returned as an ImageModel or CubeModel. If False, the return value is None. **kwargs Additional options to pass to `get_extract_parameters`. Returns ------- profile_model : ImageModel or None If ``save_profile`` is True, the return value is an ImageModel containing the spatial profile with aperture weights, used in extracting all integrations. scene_model : ImageModel, CubeModel, or None If ``save_scene_model`` is True, the return value is an ImageModel or CubeModel matching the input data, containing the flux model generated during extraction. residual : ImageModel, CubeModel, or None If ``save_residual_image`` is True, the return value is an ImageModel or CubeModel matching the input data, containing the residual image. """ if slit is None: data_model = input_model else: data_model = slit # Make sure NaNs and DQ flags match up in input pipe_utils.match_nans_and_flags(data_model) if exp_type in WFSS_EXPTYPES: instrument = input_model.meta.instrument.name else: instrument = data_model.meta.instrument.name if instrument is not None: instrument = instrument.upper() # We need a flag to indicate whether the photom step has been run. # If it hasn't, we'll copy the count rate to the flux column. s_photom = input_model.meta.cal_step.photom if s_photom is not None and s_photom.upper() == "COMPLETE": photom_has_been_run = True flux_units = "Jy" f_var_units = "Jy^2" sb_units = "MJy/sr" sb_var_units = "MJy^2 / sr^2" else: photom_has_been_run = False flux_units = "DN/s" f_var_units = "DN^2 / s^2" sb_units = "DN/s" sb_var_units = "DN^2 / s^2" log.warning("The photom step has not been run.") # Get the source type for the data if slit is not None: source_type = slit.source_type else: if isinstance(input_model, datamodels.SlitModel): source_type = input_model.source_type if source_type is None: source_type = input_model.meta.target.source_type input_model.source_type = source_type else: source_type = input_model.meta.target.source_type # Turn off use_source_posn if the source is not POINT if source_type != "POINT" or exp_type in WFSS_EXPTYPES: if kwargs.get("use_source_posn") is None: kwargs["use_source_posn"] = False log.info( f"Setting use_source_posn to False for exposure type {exp_type}, " f"source type {source_type}" ) if photom_has_been_run: pixel_solid_angle = data_model.meta.photometry.pixelarea_steradians if pixel_solid_angle is None: pixel_solid_angle = 1.0 log.warning("Pixel area (solid angle) is not populated; the flux will not be correct.") else: pixel_solid_angle = 1.0 # not needed extract_params = get_extract_parameters( extract_ref_dict, data_model, slitname, sp_order, input_model.meta, **kwargs ) if extract_params["match"] == NO_MATCH: log.critical("Missing extraction parameters.") raise ValueError("Missing extraction parameters.") elif extract_params["match"] == PARTIAL: log.info(f"Spectral order {sp_order} not found, skipping ...") raise ContinueError() extract_params["dispaxis"] = data_model.meta.wcsinfo.dispersion_direction if extract_params["dispaxis"] is None: log.warning("The dispersion direction information is missing, so skipping ...") raise ContinueError() # Set up spatial profiles and wavelength array, # to be used for every integration (ra, dec, wavelength, profile, bg_profile, nod_profile, limits) = define_aperture( input_model, slit, extract_params, exp_type ) valid = np.isfinite(wavelength) wavelength = wavelength[valid] if np.sum(valid) == 0: log.error("Spectrum is empty; no valid data.") raise ContinueError() # Save the profile if desired if save_profile: if nod_profile is not None: profile_model = datamodels.ImageModel(profile + nod_profile) else: profile_model = datamodels.ImageModel(profile) profile_model.update(input_model, only="PRIMARY") profile_model.name = slitname else: profile_model = None # Set up aperture correction, to be used for every integration apcorr_available = False if source_type is not None and source_type.upper() == "POINT" and apcorr_ref_model is not None: log.info("Creating aperture correction.") # NIRSpec needs to use a wavelength in the middle of the # range rather than the beginning of the range # for calculating the pixel scale since some wavelengths at the # edges of the range won't map to the sky if instrument == "NIRSPEC": wl = np.median(wavelength) else: wl = wavelength.min() kwargs = {"location": (ra, dec, wl)} if not isinstance(input_model, datamodels.ImageModel): kwargs["slit_name"] = slitname if exp_type in ["NRS_FIXEDSLIT", "NRS_BRIGHTOBJ"]: kwargs["slit"] = slitname apcorr_model = select_apcorr(input_model) apcorr = apcorr_model( input_model, apcorr_ref_model.apcorr_table, apcorr_ref_model.sizeunit, **kwargs ) else: apcorr = None # Log the parameters before extracting log_initial_parameters(extract_params) # Set up integration iterations and progress messages progress_msg_printed = True shape = data_model.data.shape if len(shape) == 3 and shape[0] == 1: integrations = [0] elif len(shape) == 2: integrations = [-1] else: log.info(f"Beginning loop over {shape[0]} integrations ...") integrations = range(shape[0]) progress_msg_printed = False # Set up a scene model and residual image to update if desired if save_scene_model: if len(integrations) > 1: scene_model = datamodels.CubeModel(shape) else: scene_model = datamodels.ImageModel() scene_model.update(input_model, only="PRIMARY") scene_model.name = slitname else: scene_model = None if save_residual_image: if len(integrations) > 1: residual = datamodels.CubeModel(shape) else: residual = datamodels.ImageModel() residual.update(input_model, only="PRIMARY") residual.name = slitname else: residual = None # Extract each integration spec_list = [] for integ in integrations: ( sum_flux, f_var_rnoise, f_var_poisson, f_var_flat, background, b_var_rnoise, b_var_poisson, b_var_flat, npixels, scene_model_2d, residual_2d, ) = extract_one_slit(data_model, integ, profile, bg_profile, nod_profile, extract_params) # Save the scene model and residual if save_scene_model: if isinstance(scene_model, datamodels.CubeModel): scene_model.data[integ] = scene_model_2d else: scene_model.data = scene_model_2d if save_residual_image: if isinstance(residual, datamodels.CubeModel): residual.data[integ] = residual_2d else: residual.data = residual_2d # Convert the sum to an average, for surface brightness. npixels_temp = np.where(npixels > 0.0, npixels, 1.0) npixels_squared = npixels_temp**2 if extract_params["extraction_type"] == "optimal": # surface brightness makes no sense for an optimal extraction surf_bright = np.zeros_like(sum_flux) sb_var_poisson = np.zeros_like(sum_flux) sb_var_rnoise = np.zeros_like(sum_flux) sb_var_flat = np.zeros_like(sum_flux) else: surf_bright = sum_flux / npixels_temp # may be reset below sb_var_poisson = f_var_poisson / npixels_squared sb_var_rnoise = f_var_rnoise / npixels_squared sb_var_flat = f_var_flat / npixels_squared background /= npixels_temp b_var_poisson = b_var_poisson / npixels_squared b_var_rnoise = b_var_rnoise / npixels_squared b_var_flat = b_var_flat / npixels_squared del npixels_temp # Convert to flux density. # The input units will normally be MJy / sr, but for NIRSpec # point-source spectra the units will be MJy. input_units_are_megajanskys = ( photom_has_been_run and source_type == "POINT" and instrument == "NIRSPEC" ) if photom_has_been_run: # for NIRSpec point sources if input_units_are_megajanskys: flux = sum_flux * 1.0e6 # MJy --> Jy f_var_poisson *= 1.0e12 # MJy**2 --> Jy**2 f_var_rnoise *= 1.0e12 # MJy**2 --> Jy**2 f_var_flat *= 1.0e12 # MJy**2 --> Jy**2 surf_bright[:] = 0.0 sb_var_poisson[:] = 0.0 sb_var_rnoise[:] = 0.0 sb_var_flat[:] = 0.0 background[:] /= pixel_solid_angle # MJy / sr b_var_poisson = b_var_poisson / pixel_solid_angle**2 b_var_rnoise = b_var_rnoise / pixel_solid_angle**2 b_var_flat = b_var_flat / pixel_solid_angle**2 else: flux = sum_flux * pixel_solid_angle * 1.0e6 # MJy / steradian --> Jy f_var_poisson *= pixel_solid_angle**2 * 1.0e12 # (MJy / sr)**2 --> Jy**2 f_var_rnoise *= pixel_solid_angle**2 * 1.0e12 # (MJy / sr)**2 --> Jy**2 f_var_flat *= pixel_solid_angle**2 * 1.0e12 # (MJy / sr)**2 --> Jy**2 else: flux = sum_flux # count rate del sum_flux error = np.sqrt(f_var_poisson + f_var_rnoise + f_var_flat) sb_error = np.sqrt(sb_var_poisson + sb_var_rnoise + sb_var_flat) berror = np.sqrt(b_var_poisson + b_var_rnoise + b_var_flat) # Set DQ from the flux value dq = np.zeros(flux.shape, dtype=np.uint32) dq[np.isnan(flux)] = datamodels.dqflags.pixel["DO_NOT_USE"] # Make a table of the values, trimming to points with valid wavelengths only otab = np.array( list( zip( wavelength, flux[valid], error[valid], f_var_poisson[valid], f_var_rnoise[valid], f_var_flat[valid], surf_bright[valid], sb_error[valid], sb_var_poisson[valid], sb_var_rnoise[valid], sb_var_flat[valid], dq[valid], background[valid], berror[valid], b_var_poisson[valid], b_var_rnoise[valid], b_var_flat[valid], npixels[valid], strict=False, ) ), dtype=datamodels.SpecModel().spec_table.dtype, ) spec = datamodels.SpecModel(spec_table=otab) spec.meta.wcs = spec_wcs.create_spectral_wcs(ra, dec, wavelength) spec.spec_table.columns["wavelength"].unit = "um" spec.spec_table.columns["flux"].unit = flux_units spec.spec_table.columns["flux_error"].unit = flux_units spec.spec_table.columns["flux_var_poisson"].unit = f_var_units spec.spec_table.columns["flux_var_rnoise"].unit = f_var_units spec.spec_table.columns["flux_var_flat"].unit = f_var_units spec.spec_table.columns["surf_bright"].unit = sb_units spec.spec_table.columns["sb_error"].unit = sb_units spec.spec_table.columns["sb_var_poisson"].unit = sb_var_units spec.spec_table.columns["sb_var_rnoise"].unit = sb_var_units spec.spec_table.columns["sb_var_flat"].unit = sb_var_units spec.spec_table.columns["background"].unit = sb_units spec.spec_table.columns["bkgd_error"].unit = sb_units spec.spec_table.columns["bkgd_var_poisson"].unit = sb_var_units spec.spec_table.columns["bkgd_var_rnoise"].unit = sb_var_units spec.spec_table.columns["bkgd_var_flat"].unit = sb_var_units spec.slit_ra = ra spec.slit_dec = dec spec.spectral_order = sp_order spec.dispersion_direction = extract_params["dispaxis"] spec.detector = input_model.meta.instrument.detector # Record aperture limits as x/y start/stop values lower_limit, upper_limit, left_limit, right_limit = limits if spec.dispersion_direction == HORIZONTAL: spec.extraction_xstart = left_limit + 1 spec.extraction_xstop = right_limit + 1 spec.extraction_ystart = lower_limit + 1 spec.extraction_ystop = upper_limit + 1 else: spec.extraction_xstart = lower_limit + 1 spec.extraction_xstop = upper_limit + 1 spec.extraction_ystart = left_limit + 1 spec.extraction_ystop = right_limit + 1 copy_keyword_info(data_model, slitname, spec) if exp_type in WFSS_EXPTYPES: spectral_order = data_model.meta.wcsinfo.spectral_order if hasattr(data_model.meta, "filename"): # calwebb_spec3 case: no separate slit input to function spec.meta.filename = data_model.meta.filename spec.meta.group_id = _make_group_id(data_model, spectral_order) else: # calwebb_spec2 case: data_model is a slit so need to get this meta from input_model # In this case, group_id is expected to be the same for all slits # because spec list corresponds to different sources in the same exposure # The exception is spectral_order so we get that from data_model and it # is handled separately spec.meta.filename = getattr(input_model.meta, "filename", None) spec.meta.group_id = _make_group_id(input_model, spectral_order) spec.extract2d_xstart = data_model.xstart spec.extract2d_ystart = data_model.ystart if data_model.xstart is None or data_model.ystart is None: spec.extract2d_xstop = None spec.extract2d_ystop = None else: spec.extract2d_xstop = data_model.xstart + data_model.xsize spec.extract2d_ystop = data_model.ystart + data_model.ysize if apcorr is not None: if hasattr(apcorr, "tabulated_correction"): if apcorr.tabulated_correction is not None: apcorr_available = True # See whether we can reuse the previous aperture correction # object. If so, just apply the pre-computed correction to # save a ton of time. if apcorr_available: # reuse the last aperture correction apcorr.apply(spec.spec_table, use_tabulated=True) else: # Attempt to tabulate the aperture correction for later use. # If this fails, fall back on the old method. try: apcorr.tabulate_correction(spec.spec_table) apcorr.apply(spec.spec_table, use_tabulated=True) log.debug("Tabulating aperture correction for use in multiple integrations.") except AttributeError: log.debug("Computing aperture correction.") apcorr.apply(spec.spec_table) spec_list.append(spec) if log_increment > 0 and (integ + 1) % log_increment == 0: if integ == -1: pass elif integ == 0: if input_model.data.shape[0] == 1: log.info("1 integration done") progress_msg_printed = True else: log.info("... 1 integration done") elif integ == input_model.data.shape[0] - 1: log.info(f"All {input_model.data.shape[0]} integrations done") progress_msg_printed = True else: log.info(f"... {integ + 1} integrations done") if not progress_msg_printed: log.info(f"All {input_model.data.shape[0]} integrations done") if len(spec_list) > 1: # For multi-int data, assemble a single TSOSpecModel from the list of spectra tso_spec = make_tso_specmodel(spec_list, segment=input_model.meta.exposure.segment_number) # Add to the output model output_model.spec.append(tso_spec) else: # Nothing further needed, just append the only spectrum # created to the output model output_model.spec.append(spec_list[0]) return profile_model, scene_model, residual
def _make_group_id(model, spectral_order): """ Generate a unique ID for an exposure group, including the spectral order. Parameters ---------- model : DataModel The input data model. spectral_order : int The spectral order for the exposure. Returns ------- str The group ID. """ group_id = attrs_to_group_id(model.meta.observation) return group_id + f"_{spectral_order}" def _make_output_model(data_model, meta_source): """ Set up an output model matching the input. Parameters ---------- data_model : DataModel A data model containing a "data" attribute. meta_source : DataModel A data model containing top-level metadata to copy to the output. Returns ------- MultiSpecModel or TSOMultiSpecModel If the input data is multi-integration, a TSOMultiSpecModel is returned. Otherwise, a MultiSpecModel is returned. """ multi_int = (data_model.data.ndim == 3) and (data_model.data.shape[0] > 1) if multi_int: output_model = datamodels.TSOMultiSpecModel() else: output_model = datamodels.MultiSpecModel() if hasattr(meta_source, "int_times"): output_model.int_times = meta_source.int_times.copy() output_model.update(meta_source, only="PRIMARY") return output_model
[docs] def run_extract1d( input_model, extract_ref_name="N/A", apcorr_ref_name=None, psf_ref_name="N/A", extraction_type="box", smoothing_length=None, bkg_fit=None, bkg_order=None, log_increment=50, subtract_background=None, use_source_posn=None, position_offset=0.0, model_nod_pair=False, optimize_psf_location=True, save_profile=False, save_scene_model=False, save_residual_image=False, ): """ Extract all 1-D spectra from an input model. Parameters ---------- input_model : JWSTDataModel The input science model. extract_ref_name : str The name of the extract1d reference file, or "N/A". apcorr_ref_name : str or None Name of the APCORR reference file. Default is None psf_ref_name : str The name of the PSF reference file, or "N/A". extraction_type : str Extraction type ('box' or 'optimal'). Optimal extraction is only available if ``psf_ref_name`` is not "N/A". smoothing_length : int or None Width of a boxcar function for smoothing the background regions. bkg_fit : str or None Type of fitting to apply to background values in each column (or row, if the dispersion is vertical). Allowed values are 'mean', 'median', 'poly', or None. bkg_order : int or None Polynomial order for fitting to each column (or row, if the dispersion is vertical) of background. Only used if ``bkg_fit`` is 'poly'. Allowed values are >= 0. log_increment : int If ``log_increment`` is greater than 0 and the input data are multi-integration, a message will be written to the log every ``log_increment`` integrations. subtract_background : bool or None User supplied flag indicating whether the background should be subtracted. If None, the value in the extract_1d reference file will be used. If not None, this parameter overrides the value in the extract_1d reference file. use_source_posn : bool or None If True, the target and background positions specified in the reference file (or the default position, if there is no reference file) will be shifted to account for source position offset. position_offset : float Number of pixels to shift the nominal source position in the cross-dispersion direction. model_nod_pair : bool If True, and if ``extraction_type`` is 'optimal', then a negative trace from nod subtraction is modeled alongside the positive source during extraction. Even if set to True, this will be attempted only if the input data has been background subtracted and the dither pattern indicates that only 2 nods were used. optimize_psf_location : bool If True, and if ``extraction_type`` is 'optimal', then the source location will be optimized, via iterative comparisons of the scene model with the input data. save_profile : bool If True, the spatial profiles created for the input model will be returned as ImageModels. If False, the return value is None. save_scene_model : bool If True, a model of the 2D flux as defined by the extraction aperture is returned as an ImageModel or CubeModel. If False, the return value is None. save_residual_image : bool If True, the residual image (from the input minus the scene model) is returned as an ImageModel or CubeModel. If False, the return value is None. Returns ------- output_model : MultiSpecModel or TSOMultiSpecModel A new data model containing the extracted spectra. profile_model : ModelContainer, ImageModel, or None If ``save_profile`` is True, the return value is an ImageModel containing the spatial profile with aperture weights, used in extracting a single slit, or else a container of ImageModels, one for each slit extracted. Otherwise, the return value is None. scene_model : ModelContainer, ImageModel, CubeModel, or None If ``save_scene_model`` is True, the return value is an ImageModel or CubeModel matching the input data, containing a model of the flux as defined by the aperture, created during extraction. Otherwise, the return value is None. residual : ModelContainer, ImageModel, CubeModel, or None If ``save_residual_image`` is True, the return value is an ImageModel or CubeModel matching the input data, containing the residual image (from the input minus the scene model). Otherwise, the return value is None. """ # Set "meta_source" to either the first model in a container, # or the individual input model, for convenience # of retrieving meta attributes in subsequent statements if isinstance(input_model, ModelContainer): meta_source = input_model[0] else: meta_source = input_model # Get the exposure type exp_type = meta_source.meta.exposure.type # Read in the extract1d reference file. extract_ref_dict = read_extract1d_ref(extract_ref_name) # Check for non-null PSF reference file if psf_ref_name == "N/A": if extraction_type != "box": log.warning(f"Optimal extraction is not available for EXP_TYPE {exp_type}") log.warning("Defaulting to box extraction.") extraction_type = "box" # Read in the aperture correction reference file apcorr_ref_model = None if apcorr_ref_name is not None and apcorr_ref_name != "N/A": if extraction_type == "optimal": log.warning("Turning off aperture correction for optimal extraction") else: apcorr_ref_model = read_apcorr_ref(apcorr_ref_name, exp_type) # This will be relevant if we're asked to extract a spectrum # and the spectral order is zero. # That's only OK if the disperser is a prism. prism_mode = is_prism(meta_source) # Handle inputs that contain one or more slit models profile_model = None scene_model = None residual = None if isinstance(input_model, (ModelContainer, datamodels.MultiSlitModel)): if isinstance(input_model, ModelContainer): slits = input_model else: slits = input_model.slits # Save original use_source_posn value, because it can get # toggled within the following loop over slits save_use_source_posn = use_source_posn # Make a container for the profile models, if needed if save_profile: profile_model = ModelContainer() if save_scene_model: scene_model = ModelContainer() if save_residual_image: residual = ModelContainer() # Set up the output model output_model = _make_output_model(slits[0], meta_source) for slit in slits: # Loop over the slits in the input model log.info(f"Working on slit {slit.name}") log.debug(f"Slit is of type {type(slit)}") slitname = slit.name use_source_posn = save_use_source_posn # restore original value if np.size(slit.data) <= 0: log.info(f"No data for slit {slit.name}, skipping ...") continue sp_order = get_spectral_order(slit) if sp_order == 0 and not prism_mode: log.info("Spectral order 0 is a direct image, skipping ...") continue try: profile, slit_scene_model, slit_residual = create_extraction( meta_source, slit, output_model, extract_ref_dict, slitname, sp_order, exp_type, apcorr_ref_model=apcorr_ref_model, log_increment=log_increment, save_profile=save_profile, save_scene_model=save_scene_model, save_residual_image=save_residual_image, psf_ref_name=psf_ref_name, extraction_type=extraction_type, smoothing_length=smoothing_length, bkg_fit=bkg_fit, bkg_order=bkg_order, subtract_background=subtract_background, use_source_posn=use_source_posn, position_offset=position_offset, model_nod_pair=model_nod_pair, optimize_psf_location=optimize_psf_location, ) except ContinueError: continue if save_profile: profile_model.append(profile) if save_scene_model: scene_model.append(slit_scene_model) if save_residual_image: residual.append(slit_residual) else: # Define source of metadata slit = None # Default the slitname to the exp_type, in case there's no # better value available slitname = exp_type if isinstance(input_model, datamodels.ImageModel): # Get the slit name from the input model if hasattr(input_model, "name") and input_model.name is not None: slitname = input_model.name elif isinstance(input_model, (datamodels.CubeModel, datamodels.SlitModel)): # For NRS_BRIGHTOBJ, the slit name comes from the slit model info if exp_type == "NRS_BRIGHTOBJ" and hasattr(input_model, "name"): slitname = input_model.name # For NRS_FIXEDSLIT, the slit name comes from the FXD_SLIT keyword # in the model meta if not present in the input model if exp_type == "NRS_FIXEDSLIT": if hasattr(input_model, "name") and input_model.name is not None: slitname = input_model.name else: slitname = input_model.meta.instrument.fixed_slit else: log.error("The input file is not supported for this step.") raise TypeError("Can't extract a spectrum from this file.") # Set up the output model output_model = _make_output_model(input_model, meta_source) sp_order = get_spectral_order(input_model) if sp_order == 0 and not prism_mode: log.info("Spectral order 0 is a direct image, skipping ...") else: log.info(f"Processing spectral order {sp_order}") try: profile_model, scene_model, residual = create_extraction( input_model, slit, output_model, extract_ref_dict, slitname, sp_order, exp_type, apcorr_ref_model=apcorr_ref_model, log_increment=log_increment, save_profile=save_profile, save_scene_model=save_scene_model, save_residual_image=save_residual_image, psf_ref_name=psf_ref_name, extraction_type=extraction_type, smoothing_length=smoothing_length, bkg_fit=bkg_fit, bkg_order=bkg_order, subtract_background=subtract_background, use_source_posn=use_source_posn, position_offset=position_offset, model_nod_pair=model_nod_pair, optimize_psf_location=optimize_psf_location, ) except ContinueError: pass # Copy the integration time information from the INT_TIMES table to keywords in the output file. if pipe_utils.is_tso(input_model): # int_times extension can stay for all TSO observations, but time # keywords are only populated for multi-int spectra if isinstance(output_model, datamodels.TSOMultiSpecModel): populate_time_keywords(input_model, output_model) else: log.debug("Not copying from the INT_TIMES table because this is not a TSO exposure.") if hasattr(output_model, "int_times"): del output_model.int_times output_model.meta.wcs = None # See output_model.spec[i].meta.wcs instead. if apcorr_ref_model is not None: apcorr_ref_model.close() # Remove target.source_type from the output model, so that it # doesn't force creation of an empty SCI extension in the output # x1d product just to hold this keyword. output_model.meta.target.source_type = None return output_model, profile_model, scene_model, residual