Source code for jwst.stpipe.utilities

"""Utilities for working with JWST pipeline steps."""

import importlib.util
import inspect
import logging
import os
import re
from collections.abc import Sequence
from functools import wraps
from importlib import import_module

from jwst import datamodels

# Configure logging
logger = logging.getLogger(__name__)

# Step classes that are not user-api steps
NON_STEPS = [
    "EngDBLogStep",
    "FunctionWrapper",
    "JwstPipeline",
    "JwstStep",
    "Pipeline",
    "Step",
    "SystemCall",
]

NOT_SET = "NOT SET"
COMPLETE = "COMPLETE"
SKIPPED = "SKIPPED"

__all__ = [
    "all_steps",
    "load_local_pkg",
    "folder_traverse",
    "record_step_status",
    "query_step_status",
    "invariant_filename",
]


def all_steps():
    """
    List all classes subclassed from Step.

    Returns
    -------
    steps : dict
        Key is the classname, value is the class
    """
    from jwst.stpipe import Step

    jwst = import_module("jwst")
    jwst_fpath = os.path.split(jwst.__file__)[0]

    steps = {}
    for module in load_local_pkg(jwst_fpath):
        more_steps = {
            klass_name: klass
            for klass_name, klass in inspect.getmembers(
                module, lambda o: inspect.isclass(o) and issubclass(o, Step)
            )
            if klass_name not in NON_STEPS
        }
        steps.update(more_steps)

    return steps


def load_local_pkg(fpath):
    """
    Make a generator to list all modules under fpath.

    Parameters
    ----------
    fpath : str
        File path to the package to load.

    Yields
    ------
    module
        The next module found in the package.
    """
    package_fpath, package = os.path.split(fpath)
    package_fpath_len = len(package_fpath) + 1
    try:
        for module_fpath in folder_traverse(
            fpath, basename_regex=r"[^_].+\.py$", path_exclude_regex="(tests)|(regtest)"
        ):
            folder_path, fname = os.path.split(module_fpath[package_fpath_len:])
            module_path = folder_path.split("/")
            module_path.append(os.path.splitext(fname)[0])  # noqa: PTH122
            module_path = ".".join(module_path)
            try:
                spec = importlib.util.spec_from_file_location(module_path, module_fpath)
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)
            except Exception as err:
                logger.debug(f'Cannot load module "{module_path}": {str(err)}')
            else:
                yield module
    except Exception as err:
        logger.debug(f'Cannot complete package loading: Exception occurred: "{str(err)}"')


def folder_traverse(folder_path, basename_regex=".+", path_exclude_regex="^$"):
    """
    Traverse folder and generate full file paths for each file found.

    Parameters
    ----------
    folder_path : str
        The folder to traverse

    basename_regex : str
        Regular expression that must match
        the `basename` part of the file path.

    path_exclude_regex : str
        Regular expression to exclude a path.

    Yields
    ------
    file_path : str
        The full path to the next file
    """
    basename_regex = re.compile(basename_regex)
    path_exclude_regex = re.compile(path_exclude_regex)
    for root, _dirs, files in os.walk(folder_path):
        if path_exclude_regex.search(root):
            continue
        for file in files:
            if basename_regex.match(file):
                yield os.path.join(root, file)  # noqa: PTH118


[docs] def record_step_status(datamodel, cal_step, success=True): """ Record whether or not a step completed in meta.cal_step. Parameters ---------- datamodel : `~jwst.datamodels.JwstDataModel`, `~jwst.datamodels.container.ModelContainer`, \ `~jwst.datamodels.library.ModelLibrary` This is the datamodel or container of datamodels to modify in place cal_step : str The attribute in meta.cal_step for recording the status of the step success : bool If True, then 'COMPLETE' is recorded. If False, then 'SKIPPED' """ if success: status = COMPLETE else: status = SKIPPED if isinstance(datamodel, Sequence): for model in datamodel: model.meta.cal_step._instance[cal_step] = status # noqa: SLF001 elif isinstance(datamodel, datamodels.ModelLibrary): with datamodel: for model in datamodel: model.meta.cal_step._instance[cal_step] = status # noqa: SLF001 datamodel.shelve(model) else: datamodel.meta.cal_step._instance[cal_step] = status # noqa: SLF001
# TODO: standardize cal_step naming to point to the official step name
[docs] def query_step_status(datamodel, cal_step): """ Query the status of a step in meta.cal_step. For container types (ModelContainer and ModelLibrary), only the first datamodel in the container is checked. Parameters ---------- datamodel : `~jwst.datamodels.JwstDataModel`, `~jwst.datamodels.container.ModelContainer`, \ `~jwst.datamodels.library.ModelLibrary` The datamodel or container of datamodels to check cal_step : str The attribute in meta.cal_step to check Returns ------- status : str The status of the step in meta.cal_step, typically 'COMPLETE' or 'SKIPPED' Notes ----- In principle, a step could set the COMPLETE status for only some subset of models, so checking the zeroth model instance may not always be correct. However, this is not currently done in the pipeline. This function should be updated to accommodate that use-case as needed. """ if isinstance(datamodel, Sequence): return getattr(datamodel[0].meta.cal_step, cal_step, NOT_SET) elif isinstance(datamodel, datamodels.ModelLibrary): with datamodel: meta = datamodel.read_metadata(0) status = meta.get(f"meta.cal_step.{cal_step}", NOT_SET) return status else: return getattr(datamodel.meta.cal_step, cal_step, NOT_SET)
def invariant_filename(save_model_func): """ Restore meta.filename after save_model. Parameters ---------- save_model_func : function The save_model function to wrap Returns ------- save_model : function The wrapped save_model function """ @wraps(save_model_func) def save_model(model, **kwargs): try: filename = model.meta.filename except AttributeError: filename = None result = save_model_func(model, **kwargs) if filename: model.meta.filename = filename return result return save_model