Source code for jwst.ramp_fitting.ramp_fit_step

#! /usr/bin/env python
import logging

import numpy as np
from stcal.ramp_fitting import ramp_fit
from stcal.ramp_fitting.likely_fit import LIKELY_MIN_NGROUPS
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import dqflags

from jwst.lib import reffile_utils
from jwst.stpipe import Step

log = logging.getLogger(__name__)

__all__ = ["RampFitStep"]


def get_reference_file_subarrays(model, readnoise_model, gain_model):
    """
    Get read noise and gain reference arrays.

    Get readnoise array for calculation of variance of noiseless ramps, and
    the gain array in case optimal weighting is to be done. The returned
    readnoise has been multiplied by the gain.

    Parameters
    ----------
    model : data model
        Input data model, assumed to be of type RampModel.
    readnoise_model : instance of data Model
        Readnoise for all pixels.
    gain_model : instance of gain Model
        Gain for all pixels.

    Returns
    -------
    readnoise_2d : float, 2D array
        Readnoise subarray
    gain_2d : float, 2D array
        Gain subarray
    """
    if reffile_utils.ref_matches_sci(model, gain_model):
        gain_2d = gain_model.data
    else:
        log.info("Extracting gain subarray to match science data")
        gain_2d = reffile_utils.get_subarray_model(model, gain_model).data

    if reffile_utils.ref_matches_sci(model, readnoise_model):
        readnoise_2d = readnoise_model.data
    else:
        log.info("Extracting readnoise subarray to match science data")
        readnoise_2d = reffile_utils.get_subarray_model(model, readnoise_model).data

    return readnoise_2d, gain_2d


def create_image_model(input_model, image_info):
    """
    Create an ImageModel from the computed arrays from ramp_fit.

    Parameters
    ----------
    input_model : RampModel
        Input RampModel for which the output ImageModel is created.
    image_info : tuple
        The ramp fitting arrays needed for the ImageModel.

    Returns
    -------
    out_model : ImageModel
        The output ImageModel to be returned from the ramp fit step.
    """
    data, dq, var_poisson, var_rnoise, err = image_info

    # Create output datamodel
    out_model = datamodels.ImageModel(data.shape)

    # ... and add all keys from input
    out_model.update(input_model)

    # Populate with output arrays
    out_model.data = data
    out_model.dq = dq
    out_model.var_poisson = var_poisson
    out_model.var_rnoise = var_rnoise
    out_model.err = err

    return out_model


def create_integration_model(input_model, integ_info, int_times):
    """
    Create an ImageModel from the computed arrays from ramp_fit.

    Parameters
    ----------
    input_model : RampModel
        Input RampModel for which the output CubeModel is created.
    integ_info : tuple
        The ramp fitting arrays needed for the CubeModel for each integration.
    int_times : astropy.io.fits.fitsrec.FITS_rec or None
        Integration times.

    Returns
    -------
    int_model : CubeModel
        The output CubeModel to be returned from the ramp fit step.
    """
    data, dq, var_poisson, var_rnoise, err = integ_info
    int_model = datamodels.CubeModel(
        data=np.zeros(data.shape, dtype=np.float32),
        dq=np.zeros(data.shape, dtype=np.uint32),
        var_poisson=np.zeros(data.shape, dtype=np.float32),
        var_rnoise=np.zeros(data.shape, dtype=np.float32),
        err=np.zeros(data.shape, dtype=np.float32),
    )
    int_model.update(input_model)  # ... and add all keys from input

    int_model.data = data
    int_model.dq = dq
    int_model.var_poisson = var_poisson
    int_model.var_rnoise = var_rnoise
    int_model.err = err
    int_model.int_times = int_times

    return int_model


def create_optional_results_model(input_model, opt_info):
    """
    Create an ImageModel from the computed arrays from ramp_fit.

    Parameters
    ----------
    input_model : RampModel
        The input ramp model used to create the optional results product.
    opt_info : tuple
        The ramp fitting arrays needed for the RampFitOutputModel.

    Returns
    -------
    opt_model : RampFitOutputModel
        The optional RampFitOutputModel to be returned from the ramp fit step.
    """
    opt_model = datamodels.RampFitOutputModel(
        slope=opt_info[0],
        sigslope=opt_info[1],
        var_poisson=opt_info[2],
        var_rnoise=opt_info[3],
        yint=opt_info[4],
        sigyint=opt_info[5],
        pedestal=opt_info[6],
        weights=opt_info[7],
        crmag=opt_info[8],
    )

    opt_model.meta.filename = input_model.meta.filename
    opt_model.update(input_model)  # ... and add all keys from input

    return opt_model


[docs] class RampFitStep(Step): """Fit ramp data to determine the mean count rate.""" class_alias = "ramp_fit" spec = """ algorithm = option('OLS_C', 'LIKELY', default='OLS_C') int_name = string(default='') save_opt = boolean(default=False) # Save optional output opt_name = string(default='') suppress_one_group = boolean(default=True) # Suppress saturated ramps with good 0th group firstgroup = integer(default=None) # Ignore groups before this one (zero indexed) lastgroup = integer(default=None) # Ignore groups after this one (zero indexed) maximum_cores = string(default='1') # cores for multiprocessing. Can be an integer, 'half', 'quarter', or 'all' """ # noqa: E501 weighting = "optimal" # Only weighting allowed for Build 7.1 reference_file_types = ["readnoise", "gain"]
[docs] def process(self, step_input): """ Fit ramps using the specified ramp fitting algorithm. Parameters ---------- step_input : RampModel The input ramp model to fit the ramps. Returns ------- out_model : ImageModel The output 2-D image model with the fit ramps. int_model : CubeModel The output 3-D image model with the fit ramps for each integration. """ # Open the input data model with datamodels.RampModel(step_input) as input_model: # Work on a copy result = input_model.copy() max_cores = self.maximum_cores readnoise_filename = self.get_reference_file(result, "readnoise") gain_filename = self.get_reference_file(result, "gain") ngroups = input_model.data.shape[1] if self.algorithm.upper() == "LIKELY" and ngroups < LIKELY_MIN_NGROUPS: log.info( f"When selecting the LIKELY ramp fitting algorithm the" f" ngroups needs to be a minimum of {LIKELY_MIN_NGROUPS}," f" but ngroups = {ngroups}. Due to this, the ramp fitting algorithm" f" is being changed to OLS_C" ) self.algorithm = "OLS_C" log.info(f"Using READNOISE reference file: {readnoise_filename}") log.info(f"Using GAIN reference file: {gain_filename}") with ( datamodels.ReadnoiseModel(readnoise_filename) as readnoise_model, datamodels.GainModel(gain_filename) as gain_model, ): # Try to retrieve the gain factor from the gain reference file. # If found, store it in the science model meta data, so that it's # available later in the gain_scale step, which avoids having to # load the gain ref file again in that step. if gain_model.meta.exposure.gain_factor is not None: result.meta.exposure.gain_factor = gain_model.meta.exposure.gain_factor # Get gain arrays, subarrays if desired. readnoise_2d, gain_2d = get_reference_file_subarrays( result, readnoise_model, gain_model ) log.info(f"Using algorithm = {self.algorithm}") log.info(f"Using weighting = {self.weighting}") int_times = result.int_times # Set the DO_NOT_USE bit in the groupdq values for groups before firstgroup # and groups after lastgroup firstgroup = self.firstgroup lastgroup = self.lastgroup groupdqflags = dqflags.group if firstgroup is not None or lastgroup is not None: set_groupdq(firstgroup, lastgroup, ngroups, result.groupdq, groupdqflags) # Run ramp_fit(), ignoring all DO_NOT_USE groups, and return the # ramp fitting arrays for the ImageModel, the CubeModel, and the # RampFitOutputModel. image_info, integ_info, opt_info = ramp_fit.ramp_fit( result, self.save_opt, readnoise_2d, gain_2d, self.algorithm, self.weighting, max_cores, dqflags.pixel, suppress_one_group=self.suppress_one_group, ) # Save the OLS_C optional fit product, if it exists. if opt_info is not None: opt_model = create_optional_results_model(result, opt_info) self.save_model(opt_model, "fitopt", output_file=self.opt_name) out_model, int_model = None, None # Create models from possibly updated info if image_info is not None and integ_info is not None: out_model = create_image_model(result, image_info) out_model.meta.bunit_data = "DN/s" out_model.meta.bunit_err = "DN/s" out_model.meta.cal_step.ramp_fit = "COMPLETE" if (result.meta.exposure.type in ["NRS_IFU", "MIR_MRS"]) or ( result.meta.exposure.type in ["NRS_AUTOWAVE", "NRS_LAMP"] and result.meta.instrument.lamp_mode == "IFU" ): out_model = datamodels.IFUImageModel(out_model) int_model = create_integration_model(result, integ_info, int_times) int_model.meta.bunit_data = "DN/s" int_model.meta.bunit_err = "DN/s" int_model.meta.cal_step.ramp_fit = "COMPLETE" # Cleanup del result return out_model, int_model
def set_groupdq(firstgroup, lastgroup, ngroups, groupdq, groupdqflags): """ Set the groupdq flags based on the values of firstgroup, lastgroup. Parameters ---------- firstgroup : int or None The first group to be used in the ramp fitting lastgroup : int or None The last group to be used in the ramp fitting ngroups : int The number of groups in the ramp groupdq : ndarray The groupdq array to be modified in place groupdqflags : dict The groupdq flags dict """ dnu = groupdqflags["DO_NOT_USE"] if firstgroup is None: firstgroup = 0 if lastgroup is None: lastgroup = ngroups - 1 if firstgroup < 0: log.warning("first group < 0, reset to 0") firstgroup = 0 if lastgroup >= ngroups: log.warning(f"Last group number >= #groups ({ngroups}), reset to {ngroups - 1}") if firstgroup >= lastgroup: log.warning(f"firstgroup ({firstgroup}) cannot be >= lastgroup ({lastgroup})") log.warning("Group selectors ignored") firstgroup = 0 lastgroup = ngroups - 1 if firstgroup > 0: groupdq[:, :firstgroup] = np.bitwise_or(groupdq[:, :firstgroup], dnu) if lastgroup < (ngroups - 1): groupdq[:, (lastgroup + 1) :] = np.bitwise_or(groupdq[:, (lastgroup + 1) :], dnu)