Source code for stcal.resample.utils

import logging
import warnings

import numpy as np
from scipy.ndimage import median_filter
from astropy.nddata.bitmask import (
    bitfield_to_boolean_mask,
    interpret_bit_flags,
)
from astropy import units as u
from spherical_geometry.polygon import SphericalPolygon  # type: ignore[import-untyped]


__all__ = [
    "build_driz_weight",
    "build_mask",
    "compute_mean_pixel_area",
    "get_tmeasure",
    "is_flux_density",
    "is_imaging_wcs",
    "resample_range",
]

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)


def resample_range(data_shape, bbox=None):
    # Find range of input pixels to resample:
    if len(data_shape) != 2:
        raise ValueError("Expected 'data_shape' to be of length 2.")

    if bbox is None:
        xmin = ymin = 0
        xmax = max(data_shape[1] - 1, 0)
        ymax = max(data_shape[0] - 1, 0)

    else:
        if len(bbox) != 2:
            raise ValueError("Expected bounding box to be of length 2.")
        ((x1, x2), (y1, y2)) = bbox
        xmin = max(0, int(x1 + 0.5))
        ymin = max(0, int(y1 + 0.5))
        xmax = max(xmin, min(data_shape[1] - 1, int(x2 + 0.5)))
        ymax = max(ymin, min(data_shape[0] - 1, int(y2 + 0.5)))

    return xmin, xmax, ymin, ymax


def build_mask(dqarr, good_bits, flag_name_map=None):
    """Build a bit mask from an input DQ array and a bitvalue flag

    In the returned bit mask, 1 is good, 0 is bad
    """
    good_bits = interpret_bit_flags(good_bits, flag_name_map=flag_name_map)

    dqmask = bitfield_to_boolean_mask(
        dqarr,
        good_bits,
        good_mask_value=1,
        dtype=np.uint8,
        flag_name_map=flag_name_map,
    )
    return dqmask


def build_driz_weight(model, weight_type=None, good_bits=None,
                      flag_name_map=None):
    """ Create a weight map that is used for weighting input images when
    they are co-added to the ouput model.

    Parameters
    ----------
    model : dict
        Input model: a dictionary of relevant keywords and values.

    weight_type : {"exptime", "ivm"}, None, optional
        The weighting type for adding models' data. For
        ``weight_type="ivm"``, the weighting will be
        determined per-pixel using the inverse of the read noise
        (VAR_RNOISE) array stored in each input image. If the
        ``VAR_RNOISE`` array does not exist,
        the variance is set to 1 for all pixels (i.e., equal weighting).
        If ``weight_type="exptime"``, the weight will be set equal
        to the measurement time when available and to
        the exposure time otherwise for pixels not flagged in the DQ array of
        the model. The default value of `None` will
        set weights to 1 for pixels not flagged in the DQ array of the model.
        Pixels flagged as "bad" in the DQ array will have thier weights
        set to 0.

    good_bits : int, str, None, optional
        An integer bit mask, `None`, a Python list of bit flags, a comma-,
        or ``'|'``-separated, ``'+'``-separated string list of integer
        bit flags or mnemonic flag names that indicate what bits in models'
        DQ bitfield array should be *ignored* (i.e., zeroed).

        See `Resample` for more information.

    flag_name_map : astropy.nddata.BitFlagNameMap, dict, None, optional
        A `~astropy.nddata.BitFlagNameMap` object or a dictionary that provides
        mapping from mnemonic bit flag names to integer bit values in order to
        translate mnemonic flags to numeric values when ``bit_flags``
        that are comma- or '+'-separated list of menmonic bit flag names.

    """
    data = model["data"]
    dq = model["dq"]

    dqmask = bitfield_to_boolean_mask(
        dq,
        good_bits,
        good_mask_value=1,
        dtype=np.uint8,
        flag_name_map=flag_name_map,
    )

    if weight_type == 'ivm':
        var_rnoise = model["var_rnoise"]
        if (var_rnoise is not None and
                var_rnoise.shape == data.shape):
            with np.errstate(divide="ignore", invalid="ignore"):
                inv_variance = var_rnoise**-1
            inv_variance[~np.isfinite(inv_variance)] = 1
            result = inv_variance * dqmask
        else:
            warnings.warn(
                "'var_rnoise' array not available. "
                "Setting drizzle weight map to 1",
                RuntimeWarning
            )
            result = dqmask

    elif weight_type == 'exptime':
        exptime, _ = get_tmeasure(model)
        result = np.float32(exptime) * dqmask

    elif weight_type is None:
        result = dqmask

    else:
        raise ValueError(
            f"Invalid weight type: {repr(weight_type)}."
            "Allowed weight types are 'ivm', 'exptime', or None."
        )

    return result.astype(np.float32)


def get_tmeasure(model):
    """
    Check if the measurement_time keyword is present in the datamodel
    for use in exptime weighting. If not, revert to using exposure_time.

    Returns a tuple of (exptime, is_measurement_time)
    """
    try:
        tmeasure = model["measurement_time"]
    except KeyError:
        return model["exposure_time"], False
    if tmeasure is None:
        return model["exposure_time"], False
    else:
        return tmeasure, True


[docs] def is_imaging_wcs(wcs): """ Returns `True` if ``wcs`` is an imaging WCS and `False` otherwise. """ imaging = all( ax == 'SPATIAL' for ax in wcs.output_frame.axes_type ) return imaging
def compute_mean_pixel_area(wcs, shape=None): """ Computes the average pixel area (in steradians) based on input WCS using pixels within either the bounding box (if available) or the entire data array as defined either by ``wcs.array_shape`` or the ``shape`` argument. Parameters ---------- shape : tuple, optional Shape of the region over which average pixel area will be computed. When not provided, pixel average will be estimated over a region defined by ``wcs.array_shape``. Returns ------- pix_area : float Pixel area in steradians. Notes ----- This function takes the outline of the region in which the average is computed (a rectangle defined by either the bounding box or ``wcs.array_shape`` or the ``shape``) and projects it to world coordinates. It then uses ``spherical_geometry`` to compute the area of the polygon defined by this outline on the sky. In order to minimize errors due to distortions in the ``wcs``, the code defines the outline using pixels spaced no more than 15 pixels apart along the border of the rectangle in which the average is computed. """ if (shape := (shape or wcs.array_shape)) is None: raise ValueError( "Either WCS must have 'array_shape' attribute set or 'shape' " "argument must be supplied." ) valid_polygon = False spatial_idx = np.where( np.array(wcs.output_frame.axes_type) == 'SPATIAL' )[0] ny, nx = shape if wcs.bounding_box is None: ((xmin, xmax), (ymin, ymax)) = ((-0.5, nx - 0.5), (-0.5, ny - 0.5)) else: ((xmin, xmax), (ymin, ymax)) = wcs.bounding_box if xmin > xmax: (xmin, xmax) = (xmax, xmin) if ymin > ymax: (ymin, ymax) = (ymax, ymin) xmin = max(0, int(xmin + 0.5)) xmax = min(nx - 1, int(xmax - 0.5)) ymin = max(0, int(ymin + 0.5)) ymax = min(ny - 1, int(ymax - 0.5)) k = 0 dxy = [1, -1, -1, 1] while xmin < xmax and ymin < ymax: try: (x, y, image_area, center, b, r, t, l) = _get_boundary_points( xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, dx=min((xmax - xmin) // 4, 15), dy=min((ymax - ymin) // 4, 15) ) except ValueError: return None world = wcs(x, y) ra = world[spatial_idx[0]] dec = world[spatial_idx[1]] limits = [ymin, xmax, ymax, xmin] for _ in range(4): sl = [b, r, t, l][k] if not (np.all(np.isfinite(ra[sl])) and np.all(np.isfinite(dec[sl]))): limits[k] += dxy[k] ymin, xmax, ymax, xmin = limits k = (k + 1) % 4 break k = (k + 1) % 4 else: valid_polygon = True break ymin, xmax, ymax, xmin = limits if not valid_polygon: return None world = wcs(*center) wcenter = (world[spatial_idx[0]], world[spatial_idx[1]]) sky_area = SphericalPolygon.from_radec(ra, dec, center=wcenter).area() if sky_area > 2 * np.pi: log.warning( "Unexpectedly large computed sky area for an image. " "Setting area to: 4*Pi - area" ) sky_area = 4 * np.pi - sky_area if image_area == 0: log.error("Image area is zero; cannot compute pixel area.") return None pix_area = sky_area / image_area return pix_area def _get_boundary_points(xmin, xmax, ymin, ymax, dx=None, dy=None, shrink=0): # noqa: E741 """ Creates a list of ``x`` and ``y`` coordinates of points along the perimiter of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and ``shrink`` in counter-clockwise order. Parameters ---------- xmin : int X-coordinate of the left edge of a rectangle. xmax : int X-coordinate of the right edge of a rectangle. ymin : int Y-coordinate of the bottom edge of a rectangle. ymax : int Y-coordinate of the top edge of a rectangle. dx : int, float, None, optional Desired spacing between ajacent points alog horizontal edges of the rectangle. dy : int, float, None, optional Desired spacing between ajacent points alog vertical edges of the rectangle. shrink : int, optional Amount to be applied to input ``xmin``, ``xmax``, ``ymin``, ``ymax`` to reduce the rectangle size. Returns ------- x : numpy.ndarray An array of X-coordinates of points along the perimiter of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and ``shrink`` in counter-clockwise order. y : numpy.ndarray An array of Y-coordinates of points along the perimiter of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and ``shrink`` in counter-clockwise order. area : float Area in units of pixels of the region defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and ``shrink``. center : tuple A tuple of pixel coordinates at the center of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``. bottom : slice A `slice` object that allows selection of pixels from ``x`` and ``y`` arrays along the bottom edge of the rectangle. right : slice A `slice` object that allows selection of pixels from ``x`` and ``y`` arrays along the right edge of the rectangle. top : slice A `slice` object that allows selection of pixels from ``x`` and ``y`` arrays along the top edge of the rectangle. left : slice A `slice` object that allows selection of pixels from ``x`` and ``y`` arrays along the left edge of the rectangle. """ nx = xmax - xmin + 1 ny = ymax - ymin + 1 if dx is None or dx <= 0: dx = nx if dy is None or dy <= 0: dy = ny if nx - 2 * shrink < 1 or ny - 2 * shrink < 1: raise ValueError("Image size is too small.") sx = max(1, int(np.ceil(nx / dx))) sy = max(1, int(np.ceil(ny / dy))) xmin += shrink xmax -= shrink ymin += shrink ymax -= shrink size = 2 * sx + 2 * sy x = np.empty(size) y = np.empty(size) bottom = np.s_[0:sx] # bottom edge right = np.s_[sx:sx + sy] # right edge top = np.s_[sx + sy:2 * sx + sy] # top edge left = np.s_[2 * sx + sy:2 * sx + 2 * sy] # noqa: E741 left edge x[bottom] = np.linspace(xmin, xmax, sx, False) y[bottom] = ymin x[right] = xmax y[right] = np.linspace(ymin, ymax, sy, False) x[top] = np.linspace(xmax, xmin, sx, False) y[top] = ymax x[left] = xmin y[left] = np.linspace(ymax, ymin, sy, False) area = (xmax - xmin) * (ymax - ymin) center = (0.5 * (xmin + xmax), 0.5 * (ymin + ymax)) return x, y, area, center, bottom, right, top, left def is_flux_density(bunit): """ Differentiate between surface brightness and flux density data units. Parameters ---------- bunit : str or `~astropy.units.Unit` Data units, e.g. 'MJy' (is flux density) or 'MJy/sr' (is not). Returns ------- bool True if the units are equivalent to flux density units. """ try: flux_density = u.Unit(bunit).is_equivalent(u.Jy) except (ValueError, TypeError): flux_density = False return flux_density