Source code for jwst.ami.ami_analyze_step

import asdf
import numpy as np
from stdatamodels.jwst import datamodels

from jwst.ami import ami_analyze, utils
from jwst.stpipe import Step

__all__ = ["AmiAnalyzeStep"]

# Affine parameters from commissioning
MX_COMM = 9.92820e-01
MY_COMM = 9.98540e-01
SX_COMM = 6.18605e-03
SY_COMM = -7.27008e-03
XO_COMM = 0.0
YO_COMM = 0.0


class BandpassError(Exception):
    pass


[docs] class AmiAnalyzeStep(Step): """Performs analysis of an AMI mode exposure by applying the LG algorithm.""" class_alias = "ami_analyze" spec = """ oversample = integer(default=3, min=1) # Oversampling factor rotation = float(default=0.0) # Rotation initial guess [deg] psf_offset = string(default='0.0 0.0') # PSF offset values to use to create the model array rotation_search = string(default='-3 3 1') # Rotation search parameters: start, stop, step bandpass = string(default=None) # ASDF file containing array to override filter/source usebp = boolean(default=True) # If True, exclude pixels marked DO_NOT_USE from fringe fitting firstfew = integer(default=None) # If not None, process only the first few integrations chooseholes = string(default=None) # If not None, fit only certain fringes e.g. ['B4','B5','B6','C2'] affine2d = string(default='commissioning') # ASDF file containing user-defined affine parameters OR 'commssioning' run_bpfix = boolean(default=True) # Run Fourier bad pixel fix on cropped data """ # noqa: E501 reference_file_types = ["throughput", "nrm"]
[docs] def save_model(self, model, *args, **kwargs): """ Override save_model to change suffix based on list of results. Parameters ---------- model : data model The model to save *args, **kwargs : tuple, dict Arguments to pass to the stpipe Step.save_model method Returns ------- output_paths : [str[, ...]] List of output file paths the model(s) were saved in. """ if "idx" in kwargs and kwargs.get("suffix", None) is None: kwargs["suffix"] = ["ami-oi", "amimulti-oi", "amilg"][kwargs.pop("idx")] return Step.save_model(self, model, *args, **kwargs)
[docs] def override_bandpass(self): """ Read bandpass from asdf file and use it to override the default. Expects an array of [effstims, wave_m] (i.e. np.array((effstims,wave_m)).T) stored as 'bandpass' in asdf file, where effstims are normalized countrates (unitless) and wave_m are the wavelengths across the filter at which to compute the model (meters). Returns ------- bandpass : array Array of [countrates, wavelengths] """ try: with asdf.open(self.bandpass, lazy_load=False) as af: bandpass = np.array(af["bandpass"]) # assume it is an array of the correct shape wavemin = np.min(bandpass[:, 1]) wavemax = np.max(bandpass[:, 1]) self.log.info("User-defined bandpass provided:") self.log.info("\tOVERWRITING ALL NIRISS-SPECIFIC FILTER/BANDPASS VARIABLES") self.log.info(f"Using {bandpass.shape[0]} wavelengths for fit.") self.log.info(f"Wavelength min: {wavemin:.3e} \t Wavelength max: {wavemax:.3e}") # update attribute and return self.bandpass = bandpass except FileNotFoundError as e: message = f"File {self.bandpass} could not be found at the specified location." raise BandpassError(message) from e except KeyError as e: message1 = 'ASDF file does not contain the required "bandpass" key. ' message2 = "See step documentation for info on creating a custom bandpass ASDF file." raise BandpassError(message1 + message2) from e except (IndexError, ValueError) as e: message1 = f"Could not use bandpass from {self.bandpass}. It may have the wrong shape. " message2 = "See documentation for info on creating a custom bandpass ASDF file." raise BandpassError(message1 + message2) from e else: return bandpass
[docs] def override_affine2d(self): """ Read user-input affine transform from ASDF file. Makes an Affine2d object (see utils.Affine2D class). Input should contain mx,my,sx,sy,xo,yo,rotradccw. Returns ------- affine2d : obj User-defined affine transform (``jwst.ami.utils.Affine2d``). """ msg_defaulting = "\t **** DEFAULTING TO USE IDENTITY TRANSFORM ****" try: with asdf.open(self.affine2d, lazy_load=False) as af: affine2d = utils.Affine2d( mx=af["mx"], my=af["my"], sx=af["sx"], sy=af["sy"], xo=af["xo"], yo=af["yo"], rotradccw=af["rotradccw"], ) self.log.info(f"Using affine transform from ASDF file {self.affine2d}") # now self.affine2d updated from string to object self.affine2d = affine2d except FileNotFoundError: self.log.info(f"File {self.affine2d} could not be found at the specified location.") self.log.info(msg_defaulting) affine2d = None except KeyError: message1 = ( "ASDF file does not contain all of the required keys: " "mx, my, sx, sy ,xo, yo, rotradccw. " ) message2 = "See step documentation for info on creating a custom affine2d ASDF file." self.log.info(message1 + message2) self.log.info(msg_defaulting) affine2d = None except (IndexError, TypeError, ValueError): message1 = f"Could not use affine2d from {self.affine2d}. " message2 = "See documentation for info on creating a custom bandpass ASDF file." self.log.info(message1 + message2) self.log.info(msg_defaulting) affine2d = None else: return affine2d self.affine2d = affine2d return affine2d
[docs] def process(self, input_data): """ Perform analysis of an AMI mode exposure by applying the LG algorithm. Parameters ---------- input_data : str or datamodel Input file name or datamodel Returns ------- oifitsmodel : AmiOIModel object AMI tables of median observables from LG algorithm fringe fitting in OIFITS format oifitsmodel_multi : AmiOIModel object AMI tables of observables for each integration from LG algorithm fringe fitting in OIFITS format amilgmodel : AmiLGFitModel object AMI cropped data, model, and residual data from LG algorithm fringe fitting """ # Retrieve the parameter values oversample = self.oversample rotate = self.rotation bandpass = self.bandpass usebp = self.usebp firstfew = self.firstfew chooseholes = self.chooseholes affine2d = self.affine2d run_bpfix = self.run_bpfix # pull out parameters that are strings and change to floats psf_offset = [float(a) for a in self.psf_offset.split()] rotsearch_parameters = [float(a) for a in self.rotation_search.split()] # handle command-line None input interpreted as string if str(affine2d).lower() == "none": affine2d = None self.log.info(f"Oversampling factor = {oversample}") self.log.info(f"Initial rotation guess = {rotate} deg") self.log.info(f"Initial values to use for psf offset = {psf_offset}") # Make sure oversample is odd if oversample % 2 == 0: raise ValueError("Oversample value must be an odd integer.") # Open the input data model. Can be 2D or 3D image with datamodels.open(input_data) as input_model: # Get the name of the filter throughput reference file to use throughput_reffile = self.get_reference_file(input_model, "throughput") self.log.info(f"Using filter throughput reference file {throughput_reffile}") # Check for a valid reference file or user-provided bandpass if (throughput_reffile == "N/A") & (bandpass is None): raise RuntimeError( "No THROUGHPUT reference file found. ami_analyze cannot continue." ) # If there's a user-defined bandpass or affine, handle it if bandpass is not None: bandpass = self.override_bandpass() if affine2d is not None: if affine2d == "commissioning": affine2d = utils.Affine2d( mx=MX_COMM, my=MY_COMM, sx=SX_COMM, sy=SY_COMM, xo=XO_COMM, yo=YO_COMM, name="commissioning", ) self.log.info("Using affine parameters from commissioning.") else: affine2d = self.override_affine2d() # Get the name of the NRM reference file to use nrm_reffile = self.get_reference_file(input_model, "nrm") self.log.info(f"Using NRM reference file {nrm_reffile}") with ( datamodels.ThroughputModel(throughput_reffile) as throughput_model, datamodels.NRMModel(nrm_reffile) as nrm_model, ): # Apply the LG+ methods to the data oifitsmodel, oifitsmodel_multi, amilgmodel = ami_analyze.apply_lg_plus( input_model, throughput_model, nrm_model, oversample, psf_offset, rotsearch_parameters, bandpass, usebp, firstfew, chooseholes, affine2d, run_bpfix, ) amilgmodel.meta.cal_step.ami_analyze = "COMPLETE" oifitsmodel.meta.cal_step.ami_analyze = "COMPLETE" oifitsmodel_multi.meta.cal_step.ami_analyze = "COMPLETE" return oifitsmodel, oifitsmodel_multi, amilgmodel