#!/usr/bin/env python
import logging
from collections import defaultdict
from pathlib import Path
import numpy as np
import stdatamodels.jwst.datamodels as dm
from scipy.spatial import ConvexHull, QhullError
from stcal.alignment.util import sregion_to_footprint
# step imports
from jwst.assign_mtwcs import assign_mtwcs_step
from jwst.associations.lib.rules_level3_base import format_product
from jwst.combine_1d import combine_1d_step
from jwst.cube_build import cube_build_step
from jwst.datamodels import SourceModelContainer
from jwst.datamodels.utils.wfss_multispec import make_wfss_multicombined, make_wfss_multiexposure
from jwst.exp_to_source import multislit_to_container
from jwst.extract_1d import extract_1d_step
from jwst.lib.exposure_types import is_moving_target
from jwst.master_background import master_background_step
from jwst.master_background.master_background_step import split_container
from jwst.outlier_detection import outlier_detection_step
from jwst.photom import photom_step
from jwst.pixel_replace import pixel_replace_step
from jwst.resample import resample_spec_step
from jwst.spectral_leak import spectral_leak_step
from jwst.stpipe import Pipeline, query_step_status
from jwst.stpipe.utilities import invariant_filename
log = logging.getLogger(__name__)
__all__ = ["Spec3Pipeline"]
# Group exposure types
IFU_EXPTYPES = ["MIR_MRS", "NRS_IFU"]
SLITLESS_TYPES = ["NIS_SOSS", "NIS_WFSS", "NRC_WFSS"]
WFSS_TYPES = ["NIS_WFSS", "NRC_WFSS"]
[docs]
class Spec3Pipeline(Pipeline):
"""
Process JWST spectroscopic exposures from Level 2b to 3.
Included steps are: assign_mtwcs, master_background,
outlier_detection, pixel_replace, resample_spec, cube_build,
extract_1d, photom, combine_1d, and spectral_leak.
"""
class_alias = "calwebb_spec3"
spec = """
""" # noqa: E501
# Define aliases to steps
step_defs = {
"assign_mtwcs": assign_mtwcs_step.AssignMTWcsStep,
"master_background": master_background_step.MasterBackgroundStep,
"outlier_detection": outlier_detection_step.OutlierDetectionStep,
"pixel_replace": pixel_replace_step.PixelReplaceStep,
"resample_spec": resample_spec_step.ResampleSpecStep,
"cube_build": cube_build_step.CubeBuildStep,
"extract_1d": extract_1d_step.Extract1dStep,
"photom": photom_step.PhotomStep,
"combine_1d": combine_1d_step.Combine1dStep,
"spectral_leak": spectral_leak_step.SpectralLeakStep,
}
# Main processing
[docs]
def process(self, input_data):
"""
Run the Spec3Pipeline on the input data.
Parameters
----------
input_data : str, Level3 Association, or `~jwst.datamodels.JwstDataModel`
The exposure or association of exposures to process
"""
self.log.info("Starting calwebb_spec3 ...")
asn_exptypes = ["science", "background"]
# Setup sub-step defaults
self.master_background.suffix = "mbsub"
self.outlier_detection.suffix = "crf"
self.outlier_detection.save_results = self.save_results
self.resample_spec.suffix = "s2d"
self.resample_spec.save_results = self.save_results
self.cube_build.suffix = "s3d"
self.cube_build.save_results = self.save_results
self.extract_1d.suffix = "x1d"
self.extract_1d.save_results = self.save_results
self.combine_1d.suffix = "c1d"
self.combine_1d.save_results = self.save_results
self.spectral_leak.suffix = "x1d"
self.spectral_leak.save_results = self.save_results
self.pixel_replace.suffix = "pixel_replace"
self.pixel_replace.output_use_model = True
# Overriding the Step.save_model method for the following steps.
# These steps save intermediate files, resulting in meta.filename
# being modified. This can affect the filenames of subsequent
# steps.
self.outlier_detection.save_model = invariant_filename(self.outlier_detection.save_model)
self.pixel_replace.save_model = invariant_filename(self.pixel_replace.save_model)
# Retrieve the inputs:
# could either be done via LoadAsAssociation and then manually
# load input members into models and ModelContainer, or just
# do a direct open of all members in ASN file, e.g.
input_models = dm.open(input_data, asn_exptypes=asn_exptypes)
# Immediately update the ASNTABLE keyword value in all inputs,
# so that all outputs get the new value
for model in input_models:
model.meta.asn.table_name = Path(input_models.asn_table_name).name
# For the first round of development we will assume that the input
# is ALWAYS an ASN. There's no use case for anyone ever running a
# single exposure through.
# Once data are loaded, store a few things for future use;
# some of this is here only for the purpose of creating fake
# products until the individual tasks work and do it themselves
exptype = input_models[0].meta.exposure.type
output_file = input_models.asn_table["products"][0]["name"]
self.output_file = output_file
# Find all the member types in the product
members_by_type = defaultdict(list)
product = input_models.asn_table["products"][0]
for member in product["members"]:
members_by_type[member["exptype"].lower()].append(member["expname"])
if is_moving_target(input_models[0]):
self.log.info("Assigning WCS to a Moving Target exposure.")
# assign_mtwcs modifies input_models in-place
self.assign_mtwcs.run(input_models)
# If background data are present, call the master background step
if members_by_type["background"]:
source_models = self.master_background.run(input_models)
source_models.asn_table = input_models.asn_table
# If the step is skipped, do the container splitting that
# would've been done in master_background
if query_step_status(source_models, "master_background") == "SKIPPED":
source_models, bkg_models = split_container(input_models)
# we don't need the background members
bkg_models.close()
del bkg_models
else:
# The input didn't contain any background members,
# so we use all the inputs in subsequent steps
source_models = input_models
# `sources` is the list of astronomical sources that need be
# processed. Each element is a ModelContainer, which contains
# models for all exposures that belong to a single source.
#
# For JWST spectral modes, the input associations can contain
# one of two types of collections. If the exposure type is
# considered single-source, then the association contains only
# exposures of that source.
#
# However, there are modes in which the exposures contain data
# from multiple sources. In that case, the data must be
# rearranged, collecting the exposures representing each
# source into its own ModelContainer. This produces a list of
# sources, each represented by a MultiExposureModel instead of
# a single ModelContainer.
sources = [source_models]
if isinstance(input_models[0], dm.MultiSlitModel):
self.log.info("Convert from exposure-based to source-based data.")
sources = list(multislit_to_container(source_models).items())
# Process each source
wfss_x1d = []
wfss_comb = []
for source in sources:
# If each source is a SourceModelContainer,
# the output name needs to be updated based on the source ID,
# and potentially also the slit name (for NIRSpec fixed-slit only).
if isinstance(source, tuple):
source_id, result = source
# NIRSpec fixed-slit data
if exptype == "NRS_FIXEDSLIT":
# Output file name is constructed using the source_id and the slit name
slit_name = self._create_nrsfs_slit_name(result)
srcid = f"s{source_id:>09s}"
self.output_file = format_product(
output_file, source_id=srcid, slit_name=slit_name
)
# NIRSpec MOS/MSA data
elif exptype == "NRS_MSASPEC":
# Construct the specially formatted source_id to use in the output file
# name that separates source, background, and virtual slits
srcid = self._create_nrsmos_source_id(result)
self.output_file = format_product(output_file, source_id=srcid)
self.log.debug(f"output_file = {self.output_file}")
else:
# All other types just use the source_id directly in the file name
srcid = f"s{source_id:>09s}"
self.output_file = format_product(output_file, source_id=srcid)
else:
result = source
# The MultiExposureModel is a required output, except for WFSS modes.
if isinstance(result, SourceModelContainer) and (exptype not in WFSS_TYPES):
self.save_model(result, "cal")
# Call outlier detection and pixel replacement
resample_complete = None
if exptype not in SLITLESS_TYPES:
# Update the asn table name to the level 3 instance so that
# the downstream products have the correct table name since
# the _cal files are not saved they will not be updated
for cal_array in result:
cal_array.meta.asn.table_name = Path(input_models.asn_table_name).name
if exptype in IFU_EXPTYPES:
self.outlier_detection.mode = "ifu"
else:
self.outlier_detection.mode = "spec"
result = self.outlier_detection.run(result)
# interpolate pixels that have a NaN value or are flagged
# as DO_NOT_USE or NON_SCIENCE.
result = self.pixel_replace.run(result)
# Resample time. Dependent on whether the data is IFU or not.
if exptype in IFU_EXPTYPES:
result = self.cube_build.run(result)
try:
resample_complete = result[0].meta.cal_step.cube_build
except AttributeError:
pass
else:
result = self.resample_spec.run(result)
try:
resample_complete = result.meta.cal_step.resample
except AttributeError:
pass
# Do 1-D spectral extraction
if exptype in SLITLESS_TYPES:
# interpolate pixels that have a NaN value or are flagged
# as DO_NOT_USE or NON_SCIENCE
result = self.pixel_replace.run(result)
# For slitless data, extract 1D spectra and then combine them
if exptype in ["NIS_SOSS"]:
# For NIRISS SOSS, don't save the extract_1d results,
# instead run photom on the extract_1d results and save
# those instead.
self.extract_1d.save_results = False
result = self.extract_1d.run(result)
# Check whether extraction was completed
extraction_complete = (
result is not None and result.meta.cal_step.extract_1d == "COMPLETE"
)
# SOSS F277W or FULL frame may return None - don't bother with that.
if extraction_complete:
self.photom.save_results = self.save_results
self.photom.suffix = "x1d"
result = self.photom.run(result)
result = self.combine_1d.run(result)
else:
# for WFSS modes, do not save the results with one file per source
# instead compile the results over the for loop to be put into a single file
# at the end.
self.extract_1d.save_results = False
self.combine_1d.save_results = False
result = self.extract_1d.run(result)
wfss_x1d.append(result)
# Check whether extraction was completed
extraction_complete = (
result is not None and result.meta.cal_step.extract_1d == "COMPLETE"
)
if not extraction_complete:
continue
# Combine the results for all sources
comb = self.combine_1d.run(result)
comb_complete = comb is not None and comb.meta.cal_step.combine_1d == "COMPLETE"
if not comb_complete:
continue
# add metadata that only WFSS wants
comb.spec[0].source_ra = result.spec[0].spec_table["SOURCE_RA"][0]
comb.spec[0].source_dec = result.spec[0].spec_table["SOURCE_DEC"][0]
wfss_comb.append(comb)
elif resample_complete is not None and resample_complete.upper() == "COMPLETE":
# If 2D data were resampled and combined, just do a 1D extraction
if exptype in IFU_EXPTYPES:
self.extract_1d.search_output_file = False
if exptype in ["MIR_MRS"]:
if not self.spectral_leak.skip:
self.extract_1d.save_results = False
self.spectral_leak.suffix = "x1d"
self.spectral_leak.search_output_file = False
self.spectral_leak.save_results = self.save_results
result = self.extract_1d.run(result)
if exptype in ["MIR_MRS"]:
result = self.spectral_leak.run(result)
elif exptype not in IFU_EXPTYPES:
# Extract spectra and combine results
result = self.extract_1d.run(result)
result = self.combine_1d.run(result)
else:
self.log.warning("Resampling was not completed. Skipping extract_1d.")
# Save the final output products for WFSS modes
if exptype in WFSS_TYPES:
if self.save_results:
x1d_output = make_wfss_multiexposure(wfss_x1d)
self._populate_wfss_sregion(x1d_output, input_models)
x1d_filename = output_file + "_x1d.fits"
self.log.info(f"Saving the final x1d product as {x1d_filename}.")
x1d_output.save(x1d_filename)
if self.save_results:
c1d_output = make_wfss_multicombined(wfss_comb)
c1d_filename = output_file + "_c1d.fits"
self.log.info(f"Saving the final c1d product as {c1d_filename}.")
c1d_output.save(c1d_filename)
input_models.close()
self.log.info("Ending calwebb_spec3")
return
def _create_nrsfs_slit_name(self, source_models):
"""
Create the complete slit_name product field for NIRSpec fixed-slit products.
Each unique value of slit name within the list of input source models
is appended to the final slit name string.
Parameters
----------
source_models : list of `~stdatamodels.DataModel`
List of input source models.
Returns
-------
slit_name : str
The complete slit name string to use in the output product name.
"""
slit_names = []
slit_names.append(source_models[0].name.lower())
for i in range(len(source_models)):
name = source_models[i].name.lower()
if name not in slit_names:
slit_names.append(name)
slit_name = "-".join(slit_names) # append slit names using a dash separator
return slit_name
def _create_nrsmos_source_id(self, source_models):
"""
Create the complete source_id product field for NIRSpec MOS products.
The source_id value gets a "s", "b", or "v" character prepended
to uniquely identify source, background, and virtual slits.
Parameters
----------
source_models : list of `~stdatamodels.DataModel`
List of input source models.
Returns
-------
slit_name : str
The complete slit name string to use in the output product name.
"""
# Get the original source name and ID from the input models
source_name = source_models[0].source_name
source_id = source_models[0].source_id
# MOS background sources have "BKG" in the source name
if "BKG" in source_name:
# prepend "b" to the source_id number and format to 9 chars
srcid = f"b{str(source_id):>09s}"
self.log.debug(f"Source {source_name} is a MOS background slitlet: ID={srcid}")
# MOS virtual sources have a negative source_id value
elif source_id < 0:
# prepend "v" to the source_id number and remove the leading negative sign
srcid = f"v{str(source_id)[1:]:>09s}"
self.log.debug(f"Source {source_name} is a MOS virtual slitlet: ID={srcid}")
# Regular MOS sources
else:
# prepend "s" to the source_id number and format to 9 chars
srcid = f"s{str(source_id):>09s}"
return srcid
def _populate_wfss_sregion(self, wfss_model, cal_model_list):
"""
Generate cumulative S_REGION footprint from input grism images.
This takes the input model S_REGION vertices, generates a convex hull
from those points and returns the vertices corresponding to that hull
in counterclockwise order.
Parameters
----------
wfss_model : ~datamodels.WfssMultiExposureModel
The newly generated WfssMultiExposureModel made as part of
the save operation for spec3 processing of WFSS data.
cal_model_list : list(~datamodels.MultiSlitModel)
The list of input_models provided to Spec3Pipeline by the
input association.
"""
# Make array of S_REGION vertices from input model values, then generate convex hull
try:
input_sregion_vertices = np.concatenate(
[sregion_to_footprint(w.meta.wcsinfo.s_region) for w in cal_model_list]
)
convex_sregion_hull = ConvexHull(input_sregion_vertices)
except AttributeError as err:
log.warning(
"Missing S_REGION info in input files. Skipping S_REGION assignment for x1d output."
)
log.debug(err)
return
except QhullError as qerr:
log.warning(
"Error generating convex hull from input S_REGION vertices. Skipping S_REGION "
"assignment for x1d output."
)
log.debug(qerr)
return
# Index vertices on those selected by ConvexHull
# By default, ConvexHull vertices are returned in counterclockwise order
convex_vertices = input_sregion_vertices[convex_sregion_hull.vertices]
s_region = "POLYGON ICRS " + " ".join([f"{x:.9f}" for x in convex_vertices.flatten()])
# Populate S_REGION in first entry of output model spec list.
wfss_model.spec[0].s_region = s_region