import inspect
import logging
import warnings
import astropy.units as u
import numpy as np
from astropy.convolution import Gaussian2DKernel, convolve
from astropy.stats import SigmaClip, gaussian_fwhm_to_sigma
from astropy.table import Table
from astropy.utils import lazyproperty
from astropy.utils.exceptions import AstropyUserWarning
from photutils.background import Background2D, MedianBackground
from photutils.detection import DAOStarFinder, IRAFStarFinder
from photutils.segmentation import SourceCatalog, SourceFinder
from photutils.segmentation.catalog import DEFAULT_COLUMNS
from photutils.utils import NoDetectionsWarning
from stdatamodels.jwst.datamodels import ImageModel, dqflags
log = logging.getLogger(__name__)
__all__ = ["make_tweakreg_catalog"]
SOURCECAT_COLUMNS = DEFAULT_COLUMNS + [
"ellipticity",
"sky_bbox_ll",
"sky_bbox_ul",
"sky_bbox_lr",
"sky_bbox_ur",
]
class JWSTBackground:
"""Class to estimate a 2D background and background RMS noise in an image."""
def __init__(self, data, box_size=100, coverage_mask=None):
"""
Initialize the class.
Parameters
----------
data : ndarray
The input 2D image for which to estimate the background.
box_size : int or array-like (int)
The box size along each axis. If ``box_size`` is a scalar then
a square box of size ``box_size`` will be used. If ``box_size``
has two elements, they should be in ``(ny, nx)`` order.
coverage_mask : array-like (bool), optional
A boolean mask, with the same shape as ``data``, where a `True`
value indicates the corresponding element of ``data`` is masked.
Masked data are excluded from calculations. ``coverage_mask``
should be `True` where there is no coverage (i.e., no data) for
a given pixel (e.g., blank areas in a mosaic image). It should
not be used for bad pixels.
"""
self.data = data
self.box_size = np.asarray(box_size).astype(int) # must be integer
self.coverage_mask = coverage_mask
@lazyproperty
def _background2d(self):
"""
Estimate the 2D background and background RMS noise in an image.
Returns
-------
background : `photutils.background.Background2D`
A Background2D object containing the 2D background and
background RMS noise estimates.
"""
sigma_clip = SigmaClip(sigma=3.0)
bkg_estimator = MedianBackground()
filter_size = (3, 3)
# All data have NaNs. Suppress warnings about them.
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=AstropyUserWarning)
try:
bkg = Background2D(
self.data,
self.box_size,
filter_size=filter_size,
coverage_mask=self.coverage_mask,
sigma_clip=sigma_clip,
bkg_estimator=bkg_estimator,
)
except ValueError:
# use the entire unmasked array
bkg = Background2D(
self.data,
self.data.shape,
filter_size=filter_size,
coverage_mask=self.coverage_mask,
sigma_clip=sigma_clip,
bkg_estimator=bkg_estimator,
exclude_percentile=100.0,
)
log.info(
"Background could not be estimated in meshes. "
"Using the entire unmasked array for background "
f"estimation: bkg_boxsize={self.data.shape}."
)
return bkg
@lazyproperty
def background(self):
"""
Compute the 2-D background if it has not been computed yet, then return it.
Returns
-------
background : ndarray
The 2D background image.
"""
return self._background2d.background
@lazyproperty
def background_rms(self):
"""
Compute the 2-D background RMS image if it has not been computed yet, then return it.
Returns
-------
background_rms : ndarray
The 2D background RMS image.
"""
return self._background2d.background_rms
def _convolve_data(data, kernel_fwhm, mask=None):
"""
Convolve the data with a Gaussian2D kernel.
Parameters
----------
data : ndarray
The 2D array to convolve.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the 2D Gaussian kernel.
mask : array-like, bool, optional
A boolean mask with the same shape as ``data``, where a `True`
value indicates the corresponding element of ``data`` is masked.
Returns
-------
convolved_data : ndarray
The convolved 2D array.
"""
sigma = kernel_fwhm * gaussian_fwhm_to_sigma
kernel = Gaussian2DKernel(sigma)
# All data have NaNs. Suppress warnings about them.
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=AstropyUserWarning)
return convolve(data, kernel, mask=mask)
def _rename_columns(sources):
"""
Rename catalog columns and add astropy units to be consistent between the three star finders.
Table is modified in place.
Parameters
----------
sources : `~astropy.table.QTable`
The source catalog table for which to rename columns.
"""
rename_map = {
"pa": "orientation", # for iraf and dao star finder compatibility with source_catalog step
"npix": "area", # for iraf and dao star finder compatibility with source_catalog step
"segment_flux": "flux", # for sourcefinder compatibility with tweakreg step
"label": "id", # for sourcefinder compatibility with tweakreg step
}
for old_col, new_col in rename_map.items():
if old_col in sources.colnames:
sources.rename_column(old_col, new_col)
units_map = {"orientation": u.deg}
for col, unit in units_map.items():
if col in sources.colnames:
sources[col] = u.Quantity(sources[col], unit=unit)
def _sourcefinder_wrapper(data, threshold_img, kernel_fwhm, mask=None, **kwargs):
"""
Make input and output of SourceFinder consistent with IRAFStarFinder and DAOStarFinder.
Wrapper function for photutils.source_finder.SourceFinder to make input
and output consistent with DAOStarFinder and IRAFStarFinder.
Parameters
----------
data : array-like
The 2D array of the image.
threshold_img : ndarray
The per-pixel absolute image value above which to select sources.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the 2D Gaussian kernel.
mask : array-like (bool), optional
The image mask
**kwargs : dict
Additional keyword arguments passed to `photutils.segmentation.SourceFinder`
and/or `photutils.segmentation.SourceCatalog`.
Returns
-------
sources : `~astropy.table.QTable`
A table containing the found sources.
References
----------
:ref:`photutils segmentation tutorial <photutils:image_segmentation>`.
"""
default_kwargs = {
"npixels": 10,
"progress_bar": False,
}
kwargs = {**default_kwargs, **kwargs}
# convolve the data with a Gaussian kernel
if kernel_fwhm > 0:
conv_data = _convolve_data(data, kernel_fwhm, mask=mask)
else:
conv_data = data
# handle passing kwargs into SourceFinder and SourceCatalog
# note that this suppresses TypeError: unexpected keyword arguments
# so user must be careful to know which kwargs are passed in here
finder_args = list(inspect.signature(SourceFinder).parameters)
catalog_args = list(inspect.signature(SourceCatalog).parameters)
finder_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in finder_args}
catalog_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in catalog_args}
if ("kron_params" in catalog_dict.keys()) and (catalog_dict["kron_params"] is None):
# necessary because cannot specify default in Step spec string
catalog_dict["kron_params"] = (2.5, 1.4, 0.0)
finder = SourceFinder(**finder_dict)
segment_map = finder(conv_data, threshold_img, mask=mask)
if segment_map is None:
return None, None
sources = SourceCatalog(
data,
segment_map,
mask=mask,
convolved_data=conv_data,
**catalog_dict,
).to_table(columns=SOURCECAT_COLUMNS)
return sources, segment_map
def _iraf_starfinder_wrapper(data, threshold_img, kernel_fwhm, mask=None, **kwargs):
"""
Make input and output of IRAFStarFinder consistent with SourceFinder and DAOStarFinder.
Parameters
----------
data : array-like
The 2D array of the image.
threshold_img : ndarray
The per-pixel absolute image value above which to select sources.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the Gaussian kernel
mask : array-like (bool), optional
The image mask
**kwargs : dict
Additional keyword arguments passed to `photutils.detection.IRAFStarFinder`.
Returns
-------
sources : `~astropy.table.QTable`
A table containing the found sources.
segmentation_image : ndarray or None
The segmentation image, or None if not applicable.
"""
# note that this suppresses TypeError: unexpected keyword arguments
# so user must be careful to know which kwargs are passed in here
finder_args = list(inspect.signature(IRAFStarFinder).parameters)
finder_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in finder_args}
threshold = np.median(threshold_img) # only float is supported, not per-pixel value
starfind = IRAFStarFinder(threshold, kernel_fwhm, **finder_dict)
sources = starfind(data, mask=mask)
return sources, None
def _dao_starfinder_wrapper(data, threshold_img, kernel_fwhm, mask=None, **kwargs):
"""
Make input and output of DAOStarFinder consistent with SourceFinder and IRAFStarFinder.
Parameters
----------
data : array-like
The 2D array of the image.
threshold_img : ndarray
The per-pixel absolute image value above which to select sources.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the Gaussian kernel
mask : array-like (bool), optional
The image mask
**kwargs : dict
Additional keyword arguments passed to `photutils.detection.DAOStarFinder`.
Returns
-------
sources : `~astropy.table.QTable`
A table containing the found sources.
segmentation_image : ndarray or None
The segmentation image, or None if not applicable.
"""
# for consistency with IRAFStarFinder, allow minsep_fwhm to be passed in
# and convert to pixels in the same way that IRAFStarFinder does
# see IRAFStarFinder readthedocs page and also
# https://github.com/astropy/photutils/issues/1561
if "minsep_fwhm" in kwargs:
min_sep_pix = max(2, int(kwargs["minsep_fwhm"] * kernel_fwhm + 0.5))
kwargs["min_separation"] = min_sep_pix
# note that this suppresses TypeError: unexpected keyword arguments
# so user must be careful to know which kwargs are passed in here
finder_args = list(inspect.signature(DAOStarFinder).parameters)
finder_dict = {k: kwargs.pop(k) for k in dict(kwargs) if k in finder_args}
threshold = np.median(threshold_img) # only float is supported, not per-pixel value
starfind = DAOStarFinder(threshold, kernel_fwhm, **finder_dict)
sources = starfind(data, mask=mask)
return sources, None
[docs]
def make_tweakreg_catalog(
model,
snr_threshold,
kernel_fwhm,
bkg_boxsize=400,
coverage_mask=None,
starfinder_name="iraf",
starfinder_kwargs=None,
):
"""
Create a catalog of point-line sources to be used for image alignment in tweakreg.
Parameters
----------
model : `~jwst.datamodels.ImageModel`
The input `~jwst.datamodels.ImageModel` of a single image. The input image is
assumed to be background subtracted.
snr_threshold : float
The signal-to-noise ratio per pixel above the ``background`` for
which to consider a pixel as possibly being part of a source.
kernel_fwhm : float
The full-width at half-maximum (FWHM) of the Gaussian kernel
used to convolve the image.
bkg_boxsize : float, optional
The background mesh box size in pixels.
coverage_mask : array-like (bool), optional
A boolean mask with the same shape as ``model.data``, where a `True`
value indicates the corresponding element of ``model.data`` is masked.
Masked pixels will not be included in any source.
starfinder_name : str, optional
The ``photutils`` star finder to use. Options are 'dao', 'iraf', or 'segmentation':
- 'dao': `photutils.detection.DAOStarFinder`
- 'iraf': `photutils.detection.IRAFStarFinder`
- 'segmentation': `photutils.segmentation.SourceFinder`
starfinder_kwargs : dict, optional
Additional keyword arguments to be passed to the star finder.
for 'segmentation', these can be kwargs to `photutils.segmentation.SourceFinder`
and/or `photutils.segmentation.SourceCatalog`.
for 'dao' or 'iraf', these are kwargs to `photutils.detection.DAOStarFinder`
or `photutils.detection.IRAFStarFinder`, respectively.
Defaults are as stated in the docstrings of those functions unless noted here:
- 'dao': fwhm=2.5
- 'iraf': fwhm=2.5
- 'segmentation': npixels=10, progress_bar=False
Returns
-------
catalog : `~astropy.table.Table`
An astropy Table containing the source catalog.
segmentation_image : ndarray or None
The segmentation image, or None if not applicable.
"""
if not isinstance(model, ImageModel):
raise TypeError("The input model must be an ImageModel.")
if starfinder_kwargs is None:
starfinder_kwargs = {}
if starfinder_name.lower() in ["dao", "daostarfinder"]:
starfinder = _dao_starfinder_wrapper
elif starfinder_name.lower() in ["iraf", "irafstarfinder"]:
starfinder = _iraf_starfinder_wrapper
elif starfinder_name.lower() in ["segmentation", "sourcefinder"]:
starfinder = _sourcefinder_wrapper
else:
raise ValueError(f"Unknown starfinder type: {starfinder_name}")
# Mask the non-imaging area, e.g. reference pixels and MIRI non-science area
if coverage_mask is None:
coverage_mask = (
(dqflags.pixel["NON_SCIENCE"] + dqflags.pixel["DO_NOT_USE"]) & model.dq
).astype(bool)
# Compute the background and threshold image
try:
bkg = JWSTBackground(model.data, box_size=bkg_boxsize, coverage_mask=coverage_mask)
with warnings.catch_warnings():
# suppress warning about NaNs being automatically masked - this is desired
warnings.simplefilter("ignore", AstropyUserWarning)
threshold_img = bkg.background + (snr_threshold * bkg.background_rms)
except ValueError as e:
log.warning(f"Error determining sky background: {e.args[0]}")
sources = _empty_table()
_rename_columns(sources)
return sources, None
# Run the star finder
with warnings.catch_warnings(): # handle lack of detections later
warnings.filterwarnings(
"ignore", category=NoDetectionsWarning, message="No sources were found"
)
sources, segmentation_image = starfinder(
model.data,
threshold_img,
kernel_fwhm,
mask=coverage_mask,
**starfinder_kwargs,
)
if not sources:
log.warning("No sources found in the image.")
sources = _empty_table()
_rename_columns(sources)
return sources, segmentation_image
def _empty_table():
"""
Return an empty table with the correct column names and dtypes.
Returns
-------
`~astropy.table.Table`
An empty table with the correct column names and dtypes.
"""
default_names = ["id", "xcentroid", "ycentroid", "flux"]
default_dtypes = (int, float, float, float)
return Table(names=default_names, dtype=default_dtypes).copy()