Source code for jwst.master_background.master_background_step

from pathlib import Path

import numpy as np
from scipy.signal import medfilt
from stdatamodels.jwst import datamodels
from stdatamodels.properties import merge_tree

from jwst.combine_1d.combine1d import combine_1d_spectra
from jwst.datamodels import ModelContainer
from jwst.master_background.expand_to_2d import expand_to_2d
from jwst.stpipe import Step, record_step_status

__all__ = ["MasterBackgroundStep"]


[docs] class MasterBackgroundStep(Step): """Compute and subtract master background from spectra.""" class_alias = "master_background" spec = """ median_kernel = integer(default=1) # Moving-median boxcar size to filter the background user_background = string(default=None) # Path to user-supplied master background save_background = boolean(default=False) # Save computed master background force_subtract = boolean(default=False) # Force subtracting master background output_use_model = boolean(default=True) """ # noqa: E501
[docs] def process(self, input_data): """ Compute and subtract a master background spectrum. Parameters ---------- input_data : `~jwst.datamodels.ImageModel`, `~jwst.datamodels.IFUImageModel`, \ `~jwst.datamodels.container.ModelContainer`, str Input target datamodel(s) or association file to which master background subtraction is to be applied. Returns ------- result : `~jwst.datamodels.ImageModel`, `~jwst.datamodels.IFUImageModel`, \ `~jwst.datamodels.container.ModelContainer` The background-subtracted science datamodel(s) """ with datamodels.open(input_data) as input_model: # Make the input data available to self self.input_model = input_model # First check if we should even do the subtraction. If not, bail. if not self._do_sub: result = input_model.copy() record_step_status(result, "master_background", success=False) return result # Check that data is a supported datamodel. If not, bail. if not isinstance( input_model, ( ModelContainer, datamodels.MultiSlitModel, datamodels.ImageModel, datamodels.IFUImageModel, ), ): result = input_model.copy() self.log.warning( f"Input {input_data} of type {type(input_data)} cannot be handled. " f"Step skipped." ) record_step_status(result, "master_background", success=False) return result # If user-supplied master background, subtract it if self.user_background: if isinstance(input_model, ModelContainer): input_model, _ = split_container(input_model) asn_id = input_model.asn_table["asn_id"] del _ result = ModelContainer() background_2d_collection = ModelContainer() for model in input_model: background_2d = expand_to_2d(model, self.user_background) result.append(subtract_2d_background(model, background_2d)) background_2d_collection.append(background_2d) # Record name of user-supplied master background spectrum for model in result: model.meta.background.master_background_file = Path( self.user_background ).name # Use user-supplied master background and subtract it else: asn_id = None background_2d = expand_to_2d(input_model, self.user_background) background_2d_collection = ModelContainer([background_2d]) result = subtract_2d_background(input_model, background_2d) # Record name of user-supplied master background spectrum result.meta.background.master_background_file = Path(self.user_background).name # Save the computed 2d background if requested by user. The user has supplied # the master background so just save the expanded 2d background if self.save_background: self.save_container( background_2d_collection, suffix="masterbg2d", force=True, asn_id=asn_id ) # Compute master background and subtract it else: if isinstance(input_model, ModelContainer): input_model, background_data = split_container(input_model) if len(background_data) == 0: msg = ( "No background data found in input container, " "and no user-supplied background provided. Skipping step." ) self.log.warning(msg) result = input_model.copy() record_step_status(result, "master_background", success=False) return result asn_id = input_model.asn_table["asn_id"] for model in background_data: # Check if the background members are nodded x1d extractions # or background from dedicated background exposures. # Use "bkgdtarg == False" so we don't also get None cases # for simulated data that didn't bother populating this # keyword. this_is_ifu_extended = False if ( model.meta.exposure.type == "NRS_IFU" and model.spec[0].source_type == "EXTENDED" ): this_is_ifu_extended = True if model.meta.exposure.type == "MIR_MRS": # always treat as extended for MIRI MRS this_is_ifu_extended = True if model.meta.observation.bkgdtarg is False or this_is_ifu_extended: self.log.debug("Copying BACKGROUND column to SURF_BRIGHT") copy_background_to_surf_bright(model) master_background = combine_1d_spectra( background_data, exptime_key="exposure_time", ) # If requested, apply a moving-median boxcar filter to the # master background spectrum. # Round down even kernel sizes because only odd kernel sizes are supported. if self.median_kernel % 2 == 0: self.median_kernel -= 1 self.log.info( "Even median filter kernels are not supported." f" Rounding the median kernel size down to {self.median_kernel}." ) if self.median_kernel > 1: self.log.info( f"Applying moving-median boxcar of width {self.median_kernel}." ) master_background.spec[0].spec_table["surf_bright"] = medfilt( master_background.spec[0].spec_table["surf_bright"], kernel_size=[self.median_kernel], ) master_background.spec[0].spec_table["flux"] = medfilt( master_background.spec[0].spec_table["flux"], kernel_size=[self.median_kernel], ) background_data.close() result = ModelContainer() background_2d_collection = ModelContainer() for model in input_model: background_2d = expand_to_2d(model, master_background) result.append(subtract_2d_background(model, background_2d)) background_2d_collection.append(background_2d) input_model.close() else: result = input_model.copy() input_model.close() self.log.warning( f"Input {input_data} of type {type(input_data)} cannot be " "handled without user-supplied background. Step skipped." ) record_step_status(result, "master_background", success=False) return result # Save the computed background if requested by user if self.save_background: self.save_model( master_background, suffix="masterbg1d", force=True, asn_id=asn_id ) self.save_container( background_2d_collection, suffix="masterbg2d", force=True, asn_id=asn_id ) record_step_status(result, "master_background", success=True) return result
@property def _do_sub(self): """ Decide if subtraction is to be done. Encapsulates logic that checks if background step has already been run on the data, or if the user has selected to force_subtract regardless. Returns ------- do_sub : bool If ``True``, do the subtraction """ do_sub = True if not self.force_subtract: input_model = self.input_model # check if the input data is a model container. If it is then loop over # container and see if the background was subtracted in calspec2. # If all data was background subtracted, skip master bgk subtraction. # If there is a mixture of some being background subtracted, don't # subtract and print warning message if isinstance(input_model, ModelContainer): isub = 0 for indata in input_model: if ( indata.meta.cal_step.bkg_subtract == "COMPLETE" or indata.meta.cal_step.master_background == "COMPLETE" ): do_sub = False isub += 1 if not do_sub and isub == len(input_model): self.log.info( "Not subtracting master background, background was subtracted in calspec2" ) self.log.info( "To force the master background to be subtracted from this data, " "run again and set force_subtract = True." ) if not do_sub and isub != len(input_model): self.log.warning("Not subtracting master background.") self.log.warning( "Input data contains a mixture of data with and without " "background subtraction done in calspec2." ) self.log.warning( "To force the master background to be subtracted from this data, " "run again and set force_subtract = True." ) # input data is a single file else: if ( input_model.meta.cal_step.bkg_subtract == "COMPLETE" or input_model.meta.cal_step.master_background == "COMPLETE" ): do_sub = False self.log.info( "Not subtracting master background, background was subtracted in calspec2" ) self.log.info( "To force the master background to be subtracted from this data, " "run again and set force_subtract = True." ) return do_sub
[docs] def save_container(self, container, suffix="", asn_id="", force=True): """Save all models in container for intermediate background subtraction.""" for i, model in enumerate(container): self.save_model(model, suffix=suffix, force=force, asn_id=asn_id, idx=i)
def copy_background_to_surf_bright(spectrum): """Copy the background column to the surf_bright column in a MultiSpecModel in-place.""" for spec in spectrum.spec: spec.spec_table["SURF_BRIGHT"][:] = spec.spec_table["BACKGROUND"].copy() spec.spec_table["SB_ERROR"][:] = spec.spec_table["BKGD_ERROR"].copy() # Zero out the background column for safety spec.spec_table["BACKGROUND"][:] = 0 # Set BERROR to dummy val of 0.0, as in extract_1d currently spec.spec_table["BKGD_ERROR"][:] = 0.0 def split_container(container): """ Divide a ModelContainer with science and background into one of each. Parameters ---------- container : ModelContainer Input model container Returns ------- science : ModelContainer Container for science data. background : ModelContainer Container for background data. """ background = ModelContainer() science = ModelContainer() for ind_science in container.ind_asn_type("science"): science.append(container[ind_science]) for ind_bkgd in container.ind_asn_type("background"): background.append(container[ind_bkgd]) # Pass along the association table to the output science container science.asn_pool_name = container.asn_pool_name science.asn_table_name = container.asn_table_name merge_tree(science.asn_table, container.asn_table) # Prune the background members from the table for p in science.asn_table["products"]: p["members"] = [m for m in p["members"] if m["exptype"].lower() != "background"] return science, background def subtract_2d_background(source, background): """ Subtract a 2D background. Parameters ---------- source : `~jwst.datamodels.JwstDataModel` or `~jwst.datamodels.container.ModelContainer` The input science data. background : `~jwst.datamodels.JwstDataModel` The input background data. Must be the same datamodel type as `source`. For a `~jwst.datamodels.container.ModelContainer`, the source and background models in the input containers must match one-to-one. Returns ------- `~jwst.datamodels.JwstDataModel` Background subtracted from source. """ def _subtract_2d_background(model, background): result = model.copy() # Handle individual NIRSpec FS, NIRSpec MOS if isinstance(model, datamodels.MultiSlitModel): for slit, slitbg in zip(result.slits, background.slits, strict=False): slit.data -= slitbg.data slit.dq = np.bitwise_or(slit.dq, slitbg.dq) # Handle MIRI LRS, MIRI MRS and NIRSpec IFU elif isinstance(model, (datamodels.ImageModel, datamodels.IFUImageModel)): result.data -= background.data result.dq = np.bitwise_or(result.dq, background.dq) else: # Shouldn't get here. raise TypeError(f"Input type {type(model)} is not supported.") return result # Handle containers of many datamodels if isinstance(source, ModelContainer): result = ModelContainer() result.update(source) for model, bg in zip(source, background, strict=False): result.append(_subtract_2d_background(model, bg)) # Handle single datamodels elif isinstance( source, (datamodels.ImageModel, datamodels.IFUImageModel, datamodels.MultiSlitModel) ): result = _subtract_2d_background(source, background) else: # Shouldn't get here. raise TypeError(f"Input type {type(source)} is not supported.") return result