"""
Association Registry and Markers.
The Registry object holds a set of rules used to generate association
candidates from a list of PoolRow entries.
"""
import importlib.util
import logging
from enum import EnumMeta
from inspect import getmembers, isclass, isfunction, ismethod, ismodule
from os.path import (
expandvars,
)
from pathlib import Path
from jwst.associations import libpath
from jwst.associations.exceptions import AssociationError, AssociationNotValidError
from jwst.associations.lib.callback_registry import CallbackRegistry
__all__ = ["AssociationRegistry", "RegistryMarker"]
# Configure logging
logger = logging.getLogger(__name__)
# Library files
_ASN_RULE = "association_rules.py"
[docs]
class AssociationRegistry(dict):
"""
The available association rules to match against.
Notes
-----
The general workflow is as follows:
* Create the registry
>>> from jwst.associations.registry import AssociationRegistry
>>> registry = AssociationRegistry()
* Create associations from an item
>>> associations, reprocess = registry.match(item) # doctest: +SKIP
* Finalize the associations
>>> final_asns = registry.callback.reduce("finalize", associations) # doctest: +SKIP
In practice, this is one step in a larger loop over all items to
be associated. This does not account for adding items to already
existing associations. See :py:func:`~jwst.associations.generate` for more information.
"""
def __init__(
self,
definition_files=None,
include_default=True,
global_constraints=None,
name=None,
include_bases=False,
):
"""
Initialize a new registry.
Parameters
----------
definition_files : [str,] or None
The files to find the association definitions in.
include_default : bool
True to include the default definitions.
global_constraints : Constraint or None
Constraints to be added to each rule.
name : str or None
An identifying string, used to prefix rule names.
include_bases : bool
If True, include base classes not considered
rules.
"""
super().__init__()
# Generate a UUID for this instance. Used to modify rule
# names.
self.name = name
# Callback registry
self.callback = CallbackRegistry()
# Precache the set of rules
self._rule_set = set()
if definition_files is None:
definition_files = []
if include_default:
definition_files.insert(0, libpath() / _ASN_RULE)
if len(definition_files) <= 0:
raise AssociationError("No rule definition files specified.")
self.schemas = []
self.Utility = type("Utility", (object,), {})
for fname in definition_files:
module = import_from_file(fname)
self.populate(
module, global_constraints=global_constraints, include_bases=include_bases
)
@property
def rule_set(self):
"""
Generate set of rules within the registry.
Returns
-------
set
The set of rules.
"""
return self._rule_set
[docs]
def match(self, item, version_id=None, allow=None, ignore=None):
"""
See if item belongs to any of the associations defined.
Parameters
----------
item : dict
An item, like from a Pool, to find associations for.
version_id : str
If specified, a string appended to association names.
If None, nothing is used.
allow : [type(Association), ...]
List of rules to allow to be matched. If None, all
available rules will be used.
ignore : list
A list of associations to ignore when looking for a match.
Intended to ensure that already created associations
are not re-created.
Returns
-------
associations : [association,...]
List of associations item belongs to, empty if none match.
reprocess_list : [AssociationReprocess, ...]
List of reprocess events.
"""
if allow is None:
allow = self.rule_set
if ignore is None:
ignore = []
associations = []
process_list = []
for _name, rule in self.items():
if rule not in ignore and rule in allow:
asn, reprocess = rule.create(item, version_id)
process_list.extend(reprocess)
if asn is not None:
associations.append(asn)
return associations, process_list
[docs]
def validate(self, association):
"""
Validate a given association.
Parameters
----------
association : association-like
The data to validate.
Returns
-------
rules : list
List of rules that validated.
Raises
------
AssociationNotValidError
Association did not validate.
"""
# Change rule validation from an exception
# to a boolean
def is_valid(rule, association):
try:
rule.validate(association)
except AssociationNotValidError:
return False
else:
return True
results = [rule for rule_name, rule in self.items() if is_valid(rule, association)]
if len(results) == 0:
raise AssociationNotValidError(f'Structure did not validate: "{association}"')
return results
[docs]
def load(self, serialized, fmt=None, validate=True, first=True, **kwargs):
"""
Load a previously serialized association.
Parameters
----------
serialized : object
The serialized form of the association.
fmt : str or None
The format to force. If None, try all available.
validate : bool
Validate against the class's defined schema, if any.
first : bool
A serialization potentially matches many rules.
Only return the first successful load.
**kwargs : dict
Other arguments to pass to the `load` method.
Returns
-------
association or [association, ...]
The Association object, or the list of association objects.
Raises
------
AssociationError
Cannot create or validate the association.
"""
results = []
for _rule_name, rule in self.items():
try:
results.append(rule.load(serialized, format=fmt, validate=validate, **kwargs))
except (AssociationError, AttributeError) as err:
lasterr = err
continue
if first:
break
if len(results) == 0:
raise lasterr
if first:
return results[0]
else:
return results
[docs]
def populate(self, module, global_constraints=None, include_bases=None):
"""
Parse out all rules and callbacks in a module and add them to the registry.
Parameters
----------
module : module
The module, and all submodules, to be parsed.
"""
for name, obj in get_marked(module, include_bases=include_bases):
# Add rules.
if include_bases or obj.asnreg_role == "rule":
try:
self.add_rule(name, obj, global_constraints=global_constraints)
except TypeError:
logger.debug(f"Could not add object {obj} as a rule due to TypeError")
continue
# Add callbacks
if obj.asnreg_role == "callback":
for event in obj.asnreg_events:
self.callback.add(event, obj)
continue
# Add schema
if obj.asnreg_role == "schema":
self.schemas.append(obj.asnreg_schema)
continue
# Add utility classes
if obj.asnreg_role == "utility":
self.Utility = type("Utility", (obj, self.Utility), {})
[docs]
def add_rule(self, name, obj, global_constraints=None):
"""
Add object as rule to registry.
Parameters
----------
name : str
Name of the object.
obj : object
The object to be considered a rule.
global_constraints : dict
The global constraints to attach to the rule.
"""
try:
rule_name = "_".join([self.name, name])
except TypeError:
rule_name = name
if not valid_class(obj):
raise TypeError(f"Object cannot be used as rule: {obj}")
rule = type(rule_name, (obj,), {})
rule.GLOBAL_CONSTRAINT = global_constraints
rule.registry = self
self.__setitem__(rule_name, rule)
self._rule_set.add(rule)
[docs]
class RegistryMarker:
"""Mark rules, callbacks, and modules for inclusion into a registry."""
class Schema:
"""Store schema info for association registry."""
def __init__(self, obj):
self.asnreg_role = "schema"
self.asnreg_schema = obj
RegistryMarker.mark(self)
@property
def schema(self):
"""
Point schema to asnreg_schema.
Returns
-------
pathlib.PosixPath
The path to the AssociationRegistry schema.
"""
return self.asnreg_schema
[docs]
@staticmethod
def mark(obj):
"""
Mark that an object should be part of the registry.
Parameters
----------
obj : object
The object to mark.
Returns
-------
obj
Object that has been marked. Returned to enable
use as a decorator.
Notes
-----
The following attributes are added to the object:
- asnreg_mark : True
Attribute added to object and is set to True.
- asnreg_role : str or None
If not already assigned, the role is left
unspecified using None.
"""
obj.asnreg_marked = True
obj.asnreg_role = getattr(obj, "asnreg_role", None)
return obj
[docs]
@staticmethod
def rule(obj):
"""
Mark object as rule.
Parameters
----------
obj : object
The object that should be treated as a rule.
Returns
-------
obj : object
Return object to enable use as a decorator.
Notes
-----
The following attributes are added to the object:
- asnreg_role : 'rule'
Attributed added to object and set to `rule`.
- asnreg_mark : True
Attributed added to object and set to True.
"""
obj.asnreg_role = "rule"
RegistryMarker.mark(obj)
return obj
[docs]
@staticmethod
def callback(event):
"""
Mark object as a callback for an event.
Parameters
----------
event : str
Event this is a callback for.
Returns
-------
func
Function to use as a decorator for the object to be marked.
Notes
-----
The following attributes are added to the object:
- asnreg_role : 'callback'
The role the object as been assigned.
- asnreg_events : [event[, ...]]
The events this callable object is a callback for.
- asnreg_mark : True
Indicated that the object has been marked.
"""
def decorator(func):
try:
events = func.asnreg_events
except AttributeError:
events = []
events.append(event)
RegistryMarker.mark(func)
func.asnreg_role = "callback"
func.asnreg_events = events
return func
return decorator
[docs]
@staticmethod
def schema(filename):
"""
Mark a file as a schema source.
Returns
-------
schema
The schema object.
"""
schema = RegistryMarker.Schema(filename)
return schema
[docs]
@staticmethod
def utility(class_obj):
"""
Mark the class as a Utility class.
Returns
-------
object
The marked object.
"""
class_obj.asnreg_role = "utility"
RegistryMarker.mark(class_obj)
return class_obj
[docs]
@staticmethod
def is_marked(obj):
"""
Check to see if object has been marked.
Returns
-------
bool
Whether or not the object has been marked.
"""
return hasattr(obj, "asnreg_marked")
# Utilities
def import_from_file(filename):
"""
Import a file as a module.
Parameters
----------
filename : str
The file to import
Returns
-------
module : python module
The imported module
"""
path = expandvars(str(Path(filename).expanduser()))
module_name = str(Path(path).name).split(".")[0]
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def get_marked(module, predicate=None, include_bases=False):
"""
Recursively get all executable objects.
Parameters
----------
module : python module
The module to examine
predicate : bool func(object)
Determinant of what gets returned.
If None, all object types are examined
include_bases : bool
If True, include base classes not considered
rules.
Returns
-------
class object : generator
A generator that will yield all class members in the module.
"""
def is_method(obj):
return isfunction(obj) or ismethod(obj)
for name, obj in getmembers(module, predicate):
if isclass(obj):
for sub_name, sub_obj in get_marked(obj, predicate=is_method):
yield sub_name, sub_obj
if RegistryMarker.is_marked(obj) or include_bases:
yield name, obj
elif RegistryMarker.is_marked(obj):
if ismodule(obj):
for sub_name, sub_obj in get_marked(
obj, predicate=predicate, include_bases=include_bases
):
yield sub_name, sub_obj
else:
yield name, obj
def valid_class(obj):
"""
Verify if a given object could be used as a rule class.
Parameters
----------
obj : obj
Object to check
Returns
-------
is_valid : bool
True if the object could be considered a rule class
"""
is_valid = type(obj) is not EnumMeta and isclass(obj)
return is_valid