Source code for jwst.pipeline.calwebb_coron3

#!/usr/bin/env python
from collections import defaultdict
from pathlib import Path

from stdatamodels.jwst import datamodels

# step imports
from jwst.coron import align_refs_step, klip_step, stack_refs_step
from jwst.datamodels import ModelContainer, ModelLibrary
from jwst.model_blender import ModelBlender
from jwst.outlier_detection import outlier_detection_step
from jwst.resample import resample_step
from jwst.stpipe import Pipeline

__all__ = ["Coron3Pipeline"]


def to_container(model):
    """
    Convert a CubeModel to a ModelContainer of ImageModels for each plane.

    Parameters
    ----------
    model : CubeModel
        The input model to convert

    Returns
    -------
    container : ModelContainer
        The container of ImageModels
    """
    container = ModelContainer()
    for plane in range(model.shape[0]):
        image = datamodels.ImageModel()
        for attribute in [
            "data",
            "dq",
            "err",
            "zeroframe",
            "area",
            "var_poisson",
            "var_rnoise",
            "var_flat",
        ]:
            try:
                arr = model.getarray_noinit(attribute)
                if arr.ndim == 3:
                    setattr(image, attribute, arr[plane])
                elif arr.ndim == 2:
                    setattr(image, attribute, arr)
                else:
                    msg = f"Unexpected array shape for extension {attribute}: {arr.shape}"
                    raise ValueError(msg)
            except AttributeError:
                pass
        image.update(model)
        try:
            image.meta.wcs = model.meta.wcs
        except AttributeError:
            pass
        container.append(image)
    return container


[docs] class Coron3Pipeline(Pipeline): """ Apply all level-3 calibration steps to an association of coronagraphic exposures. Included steps are: stack_refs, align_refs, klip, outlier_detection, and resample. """ class_alias = "calwebb_coron3" spec = """ suffix = string(default='i2d') """ # noqa: E501 # Define aliases to steps step_defs = { "stack_refs": stack_refs_step.StackRefsStep, "align_refs": align_refs_step.AlignRefsStep, "klip": klip_step.KlipStep, "outlier_detection": outlier_detection_step.OutlierDetectionStep, "resample": resample_step.ResampleStep, } prefetch_references = False # Main processing
[docs] def process(self, user_input): """ Run the coron3 pipeline on the input data. Parameters ---------- user_input : str, Level3 Association, or `~jwst.datamodels.JwstDataModel` The exposure or association of exposures to process """ self.log.info("Starting calwebb_coron3 ...") asn_exptypes = ["science", "psf"] # Create a DM object using the association table input_models = datamodels.open(user_input, asn_exptypes=asn_exptypes) # This asn_id assignment is important as it allows outlier detection # to know the asn_id since that step receives the cube as input. self.asn_id = input_models.asn_table["asn_id"] # Store the output file for future use self.output_file = input_models.asn_table["products"][0]["name"] # Find all the member types in the product members_by_type = defaultdict(list) prod = input_models.asn_table["products"][0] for member in prod["members"]: members_by_type[member["exptype"].lower()].append(member["expname"]) # Set up required output products and formats self.outlier_detection.suffix = "crfints" self.outlier_detection.mode = "coron" self.outlier_detection.save_results = self.save_results self.resample.blendheaders = False # Extract lists of all the PSF and science target members psf_files = members_by_type["psf"] targ_files = members_by_type["science"] # Make sure we found some PSF and target members if len(psf_files) == 0: err_str1 = "No reference PSF members found in association table." self.log.error(err_str1) self.log.error("Calwebb_coron3 processing will be aborted") return if len(targ_files) == 0: err_str1 = "No science target members found in association table" self.log.error(err_str1) self.log.error("Calwebb_coron3 processing will be aborted") return for member in psf_files + targ_files: self.prefetch(member) # Assemble all the input psf files into a single ModelContainer # and run outlier detection if desired psf_models = ModelContainer() for i in range(len(psf_files)): psf_input = datamodels.CubeModel(psf_files[i]) if not self.outlier_detection.skip: psf_input = self.outlier_detection.run(psf_input) psf_models.append(psf_input) # Stack all the PSF images into a single CubeModel psf_stack = self.stack_refs.run(psf_models) psf_models.close() del psf_models # Save the resulting PSF stack self.save_model(psf_stack, suffix="psfstack") # Call the sequence of steps: outlier_detection, align_refs, and klip # once for each input target exposure resample_input = ModelContainer() model_blender = ModelBlender() for target_file in targ_files: with datamodels.open(target_file) as target: model_blender.accumulate(target) # Remove outliers from the target if not self.outlier_detection.skip: target = self.outlier_detection.run(target) # Call align_refs psf_aligned = self.align_refs.run(target, psf_stack) # Save the alignment results self.save_model( psf_aligned, output_file=target_file, suffix="psfalign", acid=self.asn_id ) # Call KLIP psf_sub = self.klip.run(target, psf_aligned) psf_aligned.close() # Save the psf subtraction results self.save_model(psf_sub, output_file=target_file, suffix="psfsub", acid=self.asn_id) # Split out the integrations into separate models # in a ModelContainer to pass to `resample` for model in to_container(psf_sub): resample_input.append(model) # Call the resample step to combine all psf-subtracted target images # for compatibility with image3 pipeline use of ModelLibrary, # convert ModelContainer to ModelLibrary resample_library = ModelLibrary(resample_input, on_disk=False) # Output is a single datamodel result = self.resample.run(resample_library) # Blend the science headers try: completed = result.meta.cal_step.resample except AttributeError: self.log.debug("Could not determine if resample was completed.") self.log.debug("Presuming not.") completed = "SKIPPED" if completed == "COMPLETE": self.log.debug(f"Blending metadata for {result}") model_blender.finalize_model(result) try: result.meta.asn.pool_name = input_models.asn_pool_name result.meta.asn.table_name = Path(user_input).name except AttributeError: self.log.debug("Cannot set association information on final") self.log.debug(f"result {result}") # Save the final result self.save_model(result, suffix=self.suffix) # We're done self.log.info("...ending calwebb_coron3") return