Source code for jwst.pipeline.calwebb_image2

#!/usr/bin/env python
from collections import defaultdict
from pathlib import Path

from stdatamodels.jwst import datamodels

from jwst.assign_wcs import assign_wcs_step
from jwst.associations.load_as_asn import LoadAsLevel2Asn

# calwebb IMAGE2 step imports
from jwst.background import background_step
from jwst.flatfield import flat_field_step
from jwst.photom import photom_step
from jwst.resample import resample_step
from jwst.stpipe import Pipeline

__all__ = ["Image2Pipeline"]


[docs] class Image2Pipeline(Pipeline): """ Process JWST imaging-mode slope data from Level-2a to Level-2b. Included steps are: bkg_subtract, assign_wcs, flat_field, photom, and resample. """ class_alias = "calwebb_image2" spec = """ save_bsub = boolean(default=False) # Save background-subtracted science """ # noqa: E501 # Define alias to steps step_defs = { "bkg_subtract": background_step.BackgroundStep, "assign_wcs": assign_wcs_step.AssignWcsStep, "flat_field": flat_field_step.FlatFieldStep, "photom": photom_step.PhotomStep, "resample": resample_step.ResampleStep, } # List of normal imaging exp_types image_exptypes = ["MIR_IMAGE", "NRC_IMAGE", "NIS_IMAGE", "FGS_IMAGE"]
[docs] def process(self, input_data): """ Run the Image2Pipeline on the input data. Parameters ---------- input_data : str, Level2 Association, or DataModel The exposure or association of exposures to process. Returns ------- list[JWSTDataModel] The calibrated data models. """ self.log.info("Starting calwebb_image2 ...") # Retrieve the input(s) asn = LoadAsLevel2Asn.load(input_data, basename=self.output_file) if len(asn["products"]) > 1 and self.output_file is not None: self.log.warning( "Multiple products in input association. Output file name will be ignored." ) self.output_file = None # Each exposure is a product in the association. # Process each exposure. results = [] for product in asn["products"]: self.log.info("Processing product {}".format(product["name"])) if (self.save_results) & (self.output_file is None): self.output_file = product["name"] if not hasattr(asn, "filename"): asn.filename = "singleton" result = self.process_exposure_product( product, asn["asn_pool"], Path(asn.filename).name ) # Save result suffix = "cal" if isinstance(result, datamodels.CubeModel): suffix = "calints" result.meta.filename = self.make_output_path(basepath=self.output_file, suffix=suffix) results.append(result) self.output_file = None self.log.info("... ending calwebb_image2") self.output_use_model = True self.suffix = False return results
# Process each exposure
[docs] def process_exposure_product( self, exp_product, pool_name=" ", asn_file=" ", ): """ Calibrate an exposure found in the association product. Parameters ---------- exp_product : dict A Level2b association product. pool_name : str The pool file name. Used for recording purposes only. asn_file : str The name of the association file. Used for recording purposes only. Returns ------- JWSTDataModel The final calibrated product. """ # Find all the member types in the product members_by_type = defaultdict(list) for member in exp_product["members"]: members_by_type[member["exptype"].lower()].append(member["expname"]) # Get the science member. Technically there should only be # one. We'll just get the first one found. science = members_by_type["science"] if len(science) != 1: self.log.warning( "Wrong number of science files found in {}".format(exp_product["name"]) ) self.log.warning(" Using only first one.") science = science[0] self.log.info("Working on input %s ...", science) if isinstance(science, datamodels.JwstDataModel): input_data = science else: input_data = datamodels.open(science) # Record ASN pool and table names in output input_data.meta.asn.pool_name = pool_name input_data.meta.asn.table_name = asn_file input_data.meta.filename = self.make_output_path(basepath=self.output_file) # Do background processing, if necessary if len(members_by_type["background"]) > 0: # Setup for saving self.bkg_subtract.suffix = "bsub" if isinstance(input_data, datamodels.CubeModel): self.bkg_subtract.suffix = "bsubints" # Backwards compatibility if self.save_bsub: self.bkg_subtract.save_results = True # Call the background subtraction step input_data = self.bkg_subtract.run(input_data, members_by_type["background"]) # work on slope images input_data = self.assign_wcs.run(input_data) input_data = self.flat_field.run(input_data) input_data = self.photom.run(input_data) # Resample individual exposures, but only if it's one of the # regular 2D science image types if ( input_data.meta.exposure.type.upper() in self.image_exptypes and len(input_data.data.shape) == 2 ): self.resample.save_results = self.save_results self.resample.suffix = "i2d" self.resample.run(input_data) # That's all folks self.log.info("Finished processing product {}".format(exp_product["name"])) return input_data