Source code for jwst.jump.jump_step

#! /usr/bin/env python

import time

import numpy as np
from stcal.jump.jump import detect_jumps_data
from stcal.jump.jump_class import JumpData
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import dqflags

from jwst.lib import reffile_utils
from jwst.stpipe import Step

__all__ = ["JumpStep"]


[docs] class JumpStep(Step): """Perform jump detection using two point difference.""" spec = """ rejection_threshold = float(default=4.0,min=0) # CR sigma rejection threshold three_group_rejection_threshold = float(default=6.0,min=0) # CR sigma rejection threshold four_group_rejection_threshold = float(default=5.0,min=0) # CR sigma rejection threshold maximum_cores = string(default='1') # cores for multiprocessing. Can be an integer, 'half', 'quarter', or 'all' flag_4_neighbors = boolean(default=True) # flag the four perpendicular neighbors of each CR max_jump_to_flag_neighbors = float(default=1000) # maximum jump sigma that will trigger neighbor flagging min_jump_to_flag_neighbors = float(default=10) # minimum jump sigma that will trigger neighbor flagging after_jump_flag_dn1 = float(default=0) # 1st flag groups after jump above DN threshold after_jump_flag_time1 = float(default=0) # 1st flag groups after jump groups within specified time after_jump_flag_dn2 = float(default=0) # 2nd flag groups after jump above DN threshold after_jump_flag_time2 = float(default=0) # 2nd flag groups after jump groups within specified time expand_large_events = boolean(default=False) # Turns on Snowball detector for NIR detectors min_sat_area = float(default=1.0) # minimum required area for the central saturation of snowballs min_jump_area = float(default=5.0) # minimum area to trigger large events processing expand_factor = float(default=2.0) # The expansion factor for the enclosing circles or ellipses use_ellipses = boolean(default=False) # deprecated sat_required_snowball = boolean(default=True) # Require the center of snowballs to be saturated min_sat_radius_extend = float(default=2.5) # The min radius of the sat core to trigger the extension of the core sat_expand = integer(default=2) # Number of pixels to add to the radius of the saturated core of snowballs edge_size = integer(default=25) # Distance from detector edge where a saturated core is not required for snowball detection mask_snowball_core_next_int = boolean(default=True) # Flag saturated cores of snowballs in the next integration? snowball_time_masked_next_int = integer(default=4000) # Time in seconds over which saturated cores are flagged in next integration find_showers = boolean(default=False) # Apply MIRI shower flagging? max_shower_amplitude = float(default=4) # Maximum MIRI shower amplitude in DN/s extend_snr_threshold = float(default=1.2) # The SNR minimum for detection of extended showers in MIRI extend_min_area = integer(default=90) # Min area of emission after convolution for the detection of showers extend_inner_radius = float(default=1) # Inner radius of the ring_2D_kernel used for convolution extend_outer_radius = float(default=2.6) # Outer radius of the ring_2D_Kernel used for convolution extend_ellipse_expand_ratio = float(default=1.1) # Expand the radius of the ellipse fit to the extended emission time_masked_after_shower = float(default=15) # Seconds to flag as jump after a detected extended emission min_diffs_single_pass = integer(default=10) # The minimum number of differences needed to skip the iterative flagging of jumps. max_extended_radius = integer(default=200) # The maximum radius of an extended snowball or shower minimum_groups = integer(default=3) # The minimum number of groups to perform jump detection using sigma clipping minimum_sigclip_groups = integer(default=100) # The minimum number of groups to switch to sigma clipping only_use_ints = boolean(default=True) # In sigclip only compare the same group across ints, if False compare all groups """ # noqa: E501 reference_file_types = ["gain", "readnoise"] class_alias = "jump"
[docs] def process(self, step_input): """ Step method to execute step computations. Parameters ---------- step_input : RampModel The ramp model input from the previous step. Returns ------- result : RampModel The ramp model with jump step as COMPLETE and jumps detected or the jump step is SKIPPED. """ # Open the input data model with datamodels.RampModel(step_input) as input_model: tstart = time.time() # Check for an input model with NGROUPS<=2 nints, ngroups, nrows, ncols = input_model.data.shape if ngroups <= 2: self.log.warning("Cannot apply jump detection when NGROUPS<=2;") self.log.warning("Jump step will be skipped") input_model.meta.cal_step.jump = "SKIPPED" return input_model self.log.info("CR rejection threshold = %g sigma", self.rejection_threshold) if self.maximum_cores != "none": self.log.info("Maximum cores to use = %s", self.maximum_cores) # Detect jumps using a copy of the input data model. result = input_model.copy() jump_data = self._setup_jump_data(result) new_gdq, new_pdq, number_crs, number_extended_events, stddev = detect_jumps_data( jump_data ) # Update the DQ arrays of the output model with the jump detection results result.groupdq = new_gdq result.pixeldq = new_pdq # determine the number of groups with all pixels set to DO_NOT_USE dnu_flag = dqflags.pixel["DO_NOT_USE"] num_flagged_grps = 0 for integ in range(nints): for grp in range(ngroups): if np.all(np.bitwise_and(result.groupdq[integ, grp, :, :], dnu_flag)): num_flagged_grps += 1 total_groups = nints * ngroups - num_flagged_grps - nints if total_groups >= 1: total_time = result.meta.exposure.group_time * total_groups total_pixels = nrows * ncols crs = 1000 * number_crs / (total_time * total_pixels) result.meta.exposure.primary_cosmic_rays = crs events = 1e6 * number_extended_events / (total_time * total_pixels) result.meta.exposure.extended_emission_events = events tstop = time.time() self.log.info("The execution time in seconds: %f", tstop - tstart) result.meta.cal_step.jump = "COMPLETE" return result
def _setup_jump_data(self, result): """ Create a JumpData instance to be used by STCAL jump. Parameters ---------- result : RampModel The ramp model input from the previous step. Returns ------- jump_data : JumpData The data container to be used to run the STCAL detect_jumps_data. """ # Get the gain and readnoise reference files gain_filename = self.get_reference_file(result, "gain") self.log.info("Using GAIN reference file: %s", gain_filename) readnoise_filename = self.get_reference_file(result, "readnoise") self.log.info("Using READNOISE reference file: %s", readnoise_filename) with ( datamodels.ReadnoiseModel(readnoise_filename) as rnoise_m, datamodels.GainModel(gain_filename) as gain_m, ): # Get 2D gain and read noise values from their respective models if reffile_utils.ref_matches_sci(result, gain_m): gain_2d = gain_m.data else: self.log.info("Extracting gain subarray to match science data") gain_2d = reffile_utils.get_subarray_model(result, gain_m).data if reffile_utils.ref_matches_sci(result, rnoise_m): rnoise_2d = rnoise_m.data else: self.log.info("Extracting readnoise subarray to match science data") rnoise_2d = reffile_utils.get_subarray_model(result, rnoise_m).data # Instantiate a JumpData class and populate it based on the input RampModel. jump_data = JumpData(result, gain_2d, rnoise_2d, dqflags.pixel) jump_data.set_detection_settings( self.rejection_threshold, self.three_group_rejection_threshold, self.four_group_rejection_threshold, self.max_jump_to_flag_neighbors, self.min_jump_to_flag_neighbors, self.flag_4_neighbors, ) # determine the number of groups that correspond to the after_jump times # needed because the group time is not passed to detect_jumps_data gtime = result.meta.exposure.group_time after_jump_flag_n1 = int(self.after_jump_flag_time1 // gtime) after_jump_flag_n2 = int(self.after_jump_flag_time2 // gtime) jump_data.set_after_jump( self.after_jump_flag_dn1, after_jump_flag_n1, self.after_jump_flag_dn2, after_jump_flag_n2, ) sat_expand = self.sat_expand * 2 jump_data.set_snowball_info( self.expand_large_events, self.min_jump_area, self.min_sat_area, self.expand_factor, self.sat_required_snowball, self.min_sat_radius_extend, sat_expand, self.edge_size, ) max_extended_radius = self.max_extended_radius * 2 jump_data.set_shower_info( self.find_showers, self.extend_snr_threshold, self.extend_min_area, self.extend_inner_radius, self.extend_outer_radius, self.extend_ellipse_expand_ratio, self.min_diffs_single_pass, max_extended_radius, ) jump_data.set_sigma_clipping_info( self.minimum_groups, self.minimum_sigclip_groups, self.only_use_ints ) jump_data.max_cores = self.maximum_cores jump_data.grps_masked_after_shower = int(self.time_masked_after_shower // gtime) jump_data.mask_persist_grps_next_int = self.mask_snowball_core_next_int jump_data.persist_grps_flagged = int(self.snowball_time_masked_next_int // gtime) jump_data.max_shower_amplitude = jump_data.max_shower_amplitude * gtime return jump_data