Source code for jwst.stpipe.core

"""JWST-specific Step and Pipeline base classes."""

import logging
from functools import wraps
from pathlib import Path

from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import JwstDataModel, read_metadata
from stpipe import Pipeline, crds_client
from stpipe import Step as _Step

from jwst import __version__, __version_commit__
from jwst.datamodels import ModelContainer, ModelLibrary
from jwst.lib.suffix import remove_suffix
from jwst.stpipe._cal_logs import _LOG_FORMATTER

log = logging.getLogger(__name__)

__all__ = ["JwstStep", "JwstPipeline"]


[docs] class JwstStep(_Step): """A JWST pipeline step.""" spec = """ output_ext = string(default='.fits') # Output file type """ # noqa: E501 _log_records_formatter = _LOG_FORMATTER @classmethod def _datamodels_open(cls, init, **kwargs): return datamodels.open(init, **kwargs) @classmethod def _get_crds_parameters(cls, dataset): """ Get CRDS parameters for the given dataset. If the input dataset is a filename, achieve this by lazy-loading its metadata. Parameters ---------- dataset : str The name of the dataset. Returns ------- dict A dictionary of CRDS parameters. str The name of the observatory. """ crds_observatory = "jwst" # list or container: just set to zeroth model # this is what stpipe does internally for ModelContainer already if isinstance(dataset, (list, tuple, ModelContainer)): if len(dataset) == 0: raise ValueError(f"Input dataset {dataset} is empty") dataset = dataset[0] # Already open: use the model's method to get CRDS parameters if isinstance(dataset, (ModelLibrary, JwstDataModel)): return ( dataset.get_crds_parameters(), crds_observatory, ) # If we get here, we had better have a filename if isinstance(dataset, str): dataset = Path(dataset) if not isinstance(dataset, Path): raise TypeError(f"Cannot get CRDS parameters for {dataset} of type {type(dataset)}") # for associations, open as ModelLibrary, which supports lazy-loading if dataset.suffix.lower() == ".json": model = ModelLibrary(dataset, asn_n_members=1, asn_exptypes=["science"]) return (model.get_crds_parameters(), crds_observatory) # for all other cases, use read_metadata directly to lazy-load return (read_metadata(dataset, flatten=True), crds_observatory) def load_as_level2_asn(self, obj): """ Load object as an association. Loads the specified object into a Level2 association. If necessary, prepend `Step.input_dir` to all members. Parameters ---------- obj : object Object to load as a Level2 association Returns ------- association : jwst.associations.lib.rules_level2_base.DMSLevel2bBase Association """ # Prevent circular import: from jwst.associations.lib.update_path import update_key_value from jwst.associations.load_as_asn import LoadAsLevel2Asn asn = LoadAsLevel2Asn.load(obj, basename=self.output_file) update_key_value(asn, "expname", (), mod_func=self.make_input_path) return asn def load_as_level3_asn(self, obj): """ Load object as an association. Loads the specified object into a Level3 association. If necessary, prepend `Step.input_dir` to all members. Parameters ---------- obj : object Object to load as a Level3 association Returns ------- association : jwst.associations.lib.rules_level3_base.DMS_Level3_Base Association """ # Prevent circular import: from jwst.associations.lib.update_path import update_key_value from jwst.associations.load_as_asn import LoadAsAssociation asn = LoadAsAssociation.load(obj) update_key_value(asn, "expname", (), mod_func=self.make_input_path) return asn def finalize_result(self, result, reference_files_used): """ Update the result with the software version and reference files used. Parameters ---------- result : `~stdatamodels.DataModel` The output data model to be updated. reference_files_used : list of tuple The names and file paths of reference files used. """ if isinstance(result, JwstDataModel): result.meta.calibration_software_revision = __version_commit__ or "RELEASE" result.meta.calibration_software_version = __version__ if len(reference_files_used) > 0: for ref_name, filename in reference_files_used: if hasattr(result.meta.ref_file, ref_name): getattr(result.meta.ref_file, ref_name).name = filename result.meta.ref_file.crds.sw_version = crds_client.get_svn_version() result.meta.ref_file.crds.context_used = crds_client.get_context_used( result.crds_observatory ) if self.parent is None: log.info(f"Results used CRDS context: {result.meta.ref_file.crds.context_used}") if self.class_alias: if not hasattr(result, "cal_logs"): result.cal_logs = {} setattr(result.cal_logs, self.class_alias, self._log_records) def remove_suffix(self, name): """ Remove the suffix if a known suffix is already in name. Parameters ---------- name : str The name to remove the suffix from. Returns ------- name : str The name with the suffix removed. """ return remove_suffix(name) @wraps(_Step.run) def run(self, *args, **kwargs): """ Run the step. Parameters ---------- *args Arguments passed to `stpipe.Step.run`. **kwargs Keyword arguments passed to `stpipe.Step.run`. Returns ------- result : Any The step output """ result = super().run(*args, **kwargs) if not self.parent: log.info(f"Results used jwst version: {__version__}") return result
[docs] class JwstPipeline(Pipeline, JwstStep): """ A JWST pipeline. JwstPipeline needs to inherit from Pipeline, but also be a subclass of JwstStep so that it will pass checks when constructing a pipeline using JwstStep class methods. """ def finalize_result(self, result, _reference_files_used): """ Update the result with the software version and reference files used. Parameters ---------- result : `~stdatamodels.DataModel` The output data model to be updated. _reference_files_used : list of tuple The names and file paths of reference files used. """ if isinstance(result, JwstDataModel): log.info( "Results used CRDS context: " f"{crds_client.get_context_used(result.crds_observatory)}" ) if self.class_alias: if not hasattr(result, "cal_logs"): result.cal_logs = {} # remove the step logs as they're captured by the pipeline log for _, step in self.step_defs.items(): if hasattr(result.cal_logs, step.class_alias): delattr(result.cal_logs, step.class_alias) setattr(result.cal_logs, self.class_alias, self._log_records)