Source code for jwst.extract_1d.extract_1d_step
import crds
from stdatamodels.jwst import datamodels
from jwst.datamodels import ModelContainer, SourceModelContainer
from jwst.datamodels.utils.wfss_multispec import make_wfss_multiexposure
from jwst.extract_1d import extract
from jwst.extract_1d.ifu import ifu_extract1d
from jwst.extract_1d.soss_extract import soss_extract
from jwst.stpipe import Step
__all__ = ["Extract1dStep"]
[docs]
class Extract1dStep(Step):
"""Extract a 1D spectrum from 2D data."""
class_alias = "extract_1d"
spec = """
subtract_background = boolean(default=None) # subtract background?
apply_apcorr = boolean(default=True) # apply aperture corrections?
extraction_type = option("box", "optimal", None, default="box") # Perform box or optimal extraction
use_source_posn = boolean(default=None) # use source coords to center extractions?
position_offset = float(default=0) # number of pixels to shift source trace in the cross-dispersion direction
model_nod_pair = boolean(default=True) # For optimal extraction, model a negative nod if possible
optimize_psf_location = boolean(default=True) # For optimal extraction, optimize source location
smoothing_length = integer(default=None) # background smoothing size
bkg_fit = option("poly", "mean", "median", None, default=None) # background fitting type
bkg_order = integer(default=None, min=0) # order of background polynomial fit
log_increment = integer(default=50) # increment for multi-integration log messages
save_profile = boolean(default=False) # save spatial profile to disk
save_scene_model = boolean(default=False) # save flux model to disk
save_residual_image = boolean(default=False) # save residual image to disk
center_xy = float_list(min=2, max=2, default=None) # IFU extraction x/y center
ifu_autocen = boolean(default=False) # Auto source centering for IFU point source data.
bkg_sigma_clip = float(default=3.0) # background sigma clipping threshold for IFU
ifu_rfcorr = boolean(default=True) # Apply 1d residual fringe correction (MIRI MRS only)
ifu_set_srctype = option("POINT", "EXTENDED", None, default=None) # user-supplied source type
ifu_rscale = float(default=None, min=0.5, max=3) # Radius in terms of PSF FWHM to scale extraction radii
ifu_covar_scale = float(default=1.0) # Scaling factor to apply to errors to account for IFU cube covariance
soss_atoca = boolean(default=True) # use ATOCA algorithm
soss_threshold = float(default=1e-2) # TODO: threshold could be removed from inputs. Its use is too specific now.
soss_n_os = integer(default=2) # minimum oversampling factor of the underlying wavelength grid used when modeling trace.
soss_wave_grid_in = input_file(default = None) # Input wavelength grid used to model the detector
soss_wave_grid_out = string(default = None) # Output wavelength grid solution filename
soss_estimate = input_file(default = None) # Estimate used to generate the wavelength grid
soss_rtol = float(default=1.0e-4) # Relative tolerance needed on a pixel model
soss_max_grid_size = integer(default=20000) # Maximum grid size, if wave_grid not specified
soss_tikfac = float(default=None) # regularization factor for NIRISS SOSS extraction
soss_width = float(default=40.) # aperture width used to extract the 1D spectrum from the de-contaminated trace.
soss_bad_pix = option("model", "masking", default="masking") # method used to handle bad pixels
soss_modelname = output_file(default = None) # Filename for optional model output of traces and pixel weights
""" # noqa: E501
reference_file_types = ["extract1d", "apcorr", "pastasoss", "specprofile", "speckernel", "psf"]
def _get_extract_reference_files_by_mode(self, model, exp_type):
"""
Get extraction reference files with special handling by exposure type.
Parameters
----------
model : DataModel
The input model.
exp_type : str
Exposure type.
Returns
-------
extract_ref : str
EXTRACT1D reference file or N/A.
apcorr_ref : str
APCORR reference file or N/A.
psf_ref : str
PSF reference file or N/A.
"""
if isinstance(model, ModelContainer):
model = model[0]
if exp_type not in extract.WFSS_EXPTYPES:
extract_ref = self.get_reference_file(model, "extract1d")
else:
extract_ref = "N/A"
if extract_ref != "N/A":
self.log.info(f"Using EXTRACT1D reference file {extract_ref}")
if self.apply_apcorr:
apcorr_ref = self.get_reference_file(model, "apcorr")
else:
apcorr_ref = "N/A"
if apcorr_ref != "N/A":
self.log.info(f"Using APCORR file {apcorr_ref}")
try:
psf_ref = self.get_reference_file(model, "psf")
except crds.core.exceptions.CrdsLookupError:
psf_ref = "N/A"
if psf_ref != "N/A":
self.log.info(f"Using PSF reference file {psf_ref}")
return extract_ref, apcorr_ref, psf_ref
def _extract_soss(self, model):
"""
Extract NIRISS SOSS spectra.
Parameters
----------
model : DataModel
Input model.
Returns
-------
DataModel
The output spectra.
"""
# Set the filter configuration
if model.meta.instrument.filter == "CLEAR":
self.log.info("Exposure is through the GR700XD + CLEAR (science).")
soss_filter = "CLEAR"
else:
self.log.error(
"The SOSS extraction is implemented for the CLEAR filter only. "
f"Requested filter is {model.meta.instrument.filter}."
)
self.log.error("extract_1d will be skipped.")
model.meta.cal_step.extract_1d = "SKIPPED"
return model
# Set the subarray mode being processed
if model.meta.subarray.name == "SUBSTRIP256":
self.log.info("Exposure is in the SUBSTRIP256 subarray.")
self.log.info("Traces 1 and 2 will be modelled and decontaminated before extraction.")
subarray = "SUBSTRIP256"
elif model.meta.subarray.name == "SUBSTRIP96":
self.log.info("Exposure is in the SUBSTRIP96 subarray.")
self.log.info(
"Traces of orders 1 and 2 will be modelled but only order 1 "
"will be decontaminated before extraction."
)
subarray = "SUBSTRIP96"
else:
self.log.error(
"The SOSS extraction is implemented for the SUBSTRIP256 "
"and SUBSTRIP96 subarrays only. Subarray is currently "
f"{model.meta.subarray.name}."
)
self.log.error("Extract1dStep will be skipped.")
model.meta.cal_step.extract_1d = "SKIPPED"
return model
# Load reference files.
pastasoss_ref_name = self.get_reference_file(model, "pastasoss")
specprofile_ref_name = self.get_reference_file(model, "specprofile")
speckernel_ref_name = self.get_reference_file(model, "speckernel")
# Build SOSS kwargs dictionary.
soss_kwargs = {}
soss_kwargs["threshold"] = self.soss_threshold
soss_kwargs["n_os"] = self.soss_n_os
soss_kwargs["tikfac"] = self.soss_tikfac
soss_kwargs["width"] = self.soss_width
soss_kwargs["bad_pix"] = self.soss_bad_pix
soss_kwargs["subtract_background"] = self.subtract_background
soss_kwargs["rtol"] = self.soss_rtol
soss_kwargs["max_grid_size"] = self.soss_max_grid_size
soss_kwargs["wave_grid_in"] = self.soss_wave_grid_in
soss_kwargs["wave_grid_out"] = self.soss_wave_grid_out
soss_kwargs["estimate"] = self.soss_estimate
soss_kwargs["atoca"] = self.soss_atoca
# Set flag to output the model and the tikhonov tests
soss_kwargs["model"] = True if self.soss_modelname else False
# Run the extraction.
result, ref_outputs, atoca_outputs = soss_extract.run_extract1d(
model,
pastasoss_ref_name,
specprofile_ref_name,
speckernel_ref_name,
subarray,
soss_filter,
soss_kwargs,
)
# Set the step flag to complete
if result is None:
return None
else:
result.meta.cal_step.extract_1d = "COMPLETE"
result.meta.target.source_type = None
model.close()
if self.soss_modelname:
soss_modelname = self.make_output_path(
basepath=self.soss_modelname, suffix="SossExtractModel"
)
ref_outputs.save(soss_modelname)
if self.soss_modelname:
soss_modelname = self.make_output_path(
basepath=self.soss_modelname, suffix="AtocaSpectra"
)
atoca_outputs.save(soss_modelname)
return result
def _check_mrs_type(self, model):
"""
Check if the MIRI MRS data is for a single band.
Parameters
----------
model : DataModel
Input model.
Returns
-------
band_check : bool
Flag is data is for a single MIRI MRS band.
"""
# check channel is only 1 value and band is only 1 value
validch = ["1", "2", "3", "4"]
validband = ["short", "medium", "long"]
band_check = False
ch_check = False
b_check = False
if model.meta.instrument.channel in validch:
ch_check = True
if str(model.meta.instrument.band).lower() in validband:
b_check = True
if ch_check and b_check:
band_check = True
return band_check
def _extract_ifu(self, model, exp_type, extract_ref, apcorr_ref):
"""
Extract IFU spectra from a single datamodel.
Parameters
----------
model : DataModel
Input model.
exp_type : str
Exposure type.
extract_ref : str
Path to the EXTRACT1D reference file or N/A.
apcorr_ref : str
Path to the APCORR reference file or N/A.
Returns
-------
DataModel
The output spectra.
"""
source_type = model.meta.target.source_type
if self.ifu_set_srctype is not None and exp_type == "MIR_MRS":
source_type = self.ifu_set_srctype
self.log.info(f"Overriding source type and setting it to {self.ifu_set_srctype}")
if exp_type == "MIR_MRS":
band_cube = self._check_mrs_type(model)
if self.ifu_rfcorr and not band_cube:
message = (
"Turning off residual fringe correction for MIRI MRS data "
"because the input is not a single IFU band"
)
self.log.info(message)
self.ifu_rfcorr = False
else:
self.ifu_rfcorr = False
self.log.info(
"Turning off residual fringe correction because it only works on MIRI MRS data"
)
result = ifu_extract1d(
model,
extract_ref,
source_type,
self.subtract_background,
self.bkg_sigma_clip,
apcorr_ref,
self.center_xy,
self.ifu_autocen,
self.ifu_rfcorr,
self.ifu_rscale,
self.ifu_covar_scale,
)
return result
def _save_intermediate(self, intermediate_model, suffix, idx):
"""
Save an intermediate output file.
Parameters
----------
intermediate_model : DataModel
A model to save
suffix : str
Suffix to append to the output filename.
idx : int or None
Index to append to the output filename.
"""
if isinstance(intermediate_model, ModelContainer):
# Save the profile with the slit name + index + suffix 'profile'
for model in intermediate_model:
slit = str(model.name).lower()
if idx is not None:
complete_suffix = f"{slit}_{idx}_{suffix}"
else:
complete_suffix = f"{slit}_{suffix}"
output_path = self.make_output_path(suffix=complete_suffix)
self.log.info(f"Saving {suffix} {output_path}")
model.save(output_path)
else:
# Only one profile - just use the index and suffix 'profile'
if idx is not None:
complete_suffix = f"{idx}_{suffix}"
else:
complete_suffix = suffix
output_path = self.make_output_path(suffix=complete_suffix)
self.log.info(f"Saving {suffix} {output_path}")
intermediate_model.save(output_path)
intermediate_model.close()
[docs]
def process(self, input_data):
"""
Execute the step.
Parameters
----------
input_data : DataModel
The input model.
Returns
-------
DataModel
This will be `input_model` if the step was skipped; otherwise,
it will be a model containing 1-D extracted spectra.
"""
# Open the input and figure out what type of model it is
if isinstance(input_data, ModelContainer):
input_model = input_data
else:
input_model = datamodels.open(input_data)
if isinstance(
input_model,
(
datamodels.CubeModel,
datamodels.ImageModel,
datamodels.SlitModel,
datamodels.IFUCubeModel,
ModelContainer,
SourceModelContainer,
),
):
# Acceptable input type, just log it
self.log.debug(f"Input is a {str(input_model)}.")
elif isinstance(input_model, datamodels.MultiSlitModel):
# If input is multislit, with 3D calints, skip the step
self.log.debug("Input is a MultiSlitModel")
if len((input_model[0]).shape) == 3:
self.log.warning("3D input is unsupported; step will be skipped")
input_model.meta.cal_step.extract_1d = "SKIPPED"
return input_model
else:
self.log.error(f"Input is a {str(input_model)}, ")
self.log.error("which was not expected for extract_1d.")
self.log.error("The extract_1d step will be skipped.")
input_model.meta.cal_step.extract_1d = "SKIPPED"
return input_model
if not isinstance(input_model, ModelContainer):
exp_type = input_model.meta.exposure.type
# Make the input iterable
input_model = [input_model]
else:
exp_type = input_model[0].meta.exposure.type
self.log.debug(f"Input for EXP_TYPE {exp_type} contains {len(input_model)} items")
if len(input_model) > 1 and exp_type in extract.WFSS_EXPTYPES:
# For WFSS level-3, the input is a single entry of a
# SourceContainer, which contains a list of multiple
# SlitModels for a single source. Send the whole container
# into extract1d and put all results in a single product.
input_model = [input_model]
if exp_type == "NIS_SOSS":
# Data is NIRISS SOSS observation, use its own extraction routines
self.log.info(
"Input is a NIRISS SOSS observation, the specialized SOSS "
"extraction (ATOCA) will be used."
)
# There is only one input model for this mode
model = input_model[0]
result = self._extract_soss(model)
else:
result = ModelContainer()
for i, model in enumerate(input_model):
# Get the reference file names
extract_ref, apcorr_ref, psf_ref = self._get_extract_reference_files_by_mode(
model, exp_type
)
profile = None
scene_model = None
residual = None
if isinstance(model, datamodels.IFUCubeModel):
# Call the IFU specific extraction routine
extracted = self._extract_ifu(model, exp_type, extract_ref, apcorr_ref)
else:
# Call the general extraction routine
extracted, profile, scene_model, residual = extract.run_extract1d(
model,
extract_ref,
apcorr_ref,
psf_ref,
self.extraction_type,
self.smoothing_length,
self.bkg_fit,
self.bkg_order,
self.log_increment,
self.subtract_background,
self.use_source_posn,
self.position_offset,
self.model_nod_pair,
self.optimize_psf_location,
self.save_profile,
self.save_scene_model,
self.save_residual_image,
)
# Set the step flag to complete in each model
extracted.meta.cal_step.extract_1d = "COMPLETE"
result.append(extracted)
del extracted
# Save profile if needed
if len(input_model) > 1:
idx = i
else:
idx = None
if self.save_profile and profile is not None:
self._save_intermediate(profile, "profile", idx)
# Save model if needed
if self.save_scene_model and scene_model is not None:
self._save_intermediate(scene_model, "scene_model", idx)
# Save residual if needed
if self.save_residual_image and residual is not None:
self._save_intermediate(residual, "residual", idx)
# If only one result, return the model instead of the container
if len(result) == 1:
result = result[0]
if (
(self._input_filename is None)
and (not isinstance(result, ModelContainer))
and (result is not None)
and (self.output_file is None)
):
# Fix output file naming for WFSS multislitmodels that were originally generated
# from SourceModelContainer
self._input_filename = result.meta.filename
# For WFSS, reorder the x1d product to save it in the flat format
if exp_type in extract.WFSS_EXPTYPES:
result = make_wfss_multiexposure(result)
result.meta.cal_step.extract_1d = "COMPLETE"
return result