import logging
from timeit import default_timer as timer
from jwst.associations.association import make_timestamp
from jwst.associations.lib.process_list import (
ListCategory,
ProcessList,
ProcessQueueSorted,
workover_filter,
)
from jwst.associations.pool import PoolRow
from jwst.lib.progress import Bar
# Configure logging
logger = logging.getLogger(__name__)
__all__ = ["generate"]
[docs]
def generate(pool, rules, version_id=None, finalize=True):
"""
Generate associations in the pool according to the rules.
Parameters
----------
pool : AssociationPool
The pool to generate from.
rules : AssociationRegistry
The association rule set.
version_id : None, True, or str
The string to use to tag associations and products.
If None, no tagging occurs.
If True, use a timestamp
If a string, the string.
finalize : bool
Run all rule methods marked as 'finalized'.
Returns
-------
associations : [Association[,...]]
List of associations
Notes
-----
Refer to the :ref:`Association Generator <design-generator>`
documentation for a full description.
"""
associations = []
if isinstance(version_id, bool):
version_id = make_timestamp()
process_queue = ProcessQueueSorted(
[ProcessList(items=pool, rules=[rule for _, rule in rules.items()])]
)
logger.debug("Initial process queue: %s", process_queue)
for process_list in process_queue:
logger.debug("** Working process list: %s", process_list)
time_start = timer()
total_mod_existing = 0
total_new = 0
total_reprocess = 0
with Bar(
"Processing items", log_level=logger.getEffectiveLevel(), max=len(process_list.items)
) as bar:
for item in process_list.items:
item = PoolRow(item)
existing_asns, new_asns, to_process = generate_from_item(
item, version_id, associations, rules, process_list
)
total_mod_existing += len(existing_asns)
total_new += len(new_asns)
associations.extend(new_asns)
# If working on a process list EXISTING
# remove any new `to_process` that is
# also EXISTING. Prevent infinite loops.
to_process_modified = []
for next_list in to_process:
next_list = workover_filter(next_list, process_list.work_over)
if next_list:
to_process_modified.append(next_list)
process_queue.extend(to_process_modified)
total_reprocess += len(to_process_modified)
bar.next()
logger.info("Seconds to process: %.2f", timer() - time_start)
logger.debug(
"Existing associations modified: %d New associations created: %d",
total_mod_existing,
total_new,
)
logger.debug("New process lists: %d", total_reprocess)
logger.debug("Updated process queue: %s", process_queue)
logger.debug("# associations: %d", len(associations))
logger.debug("Associations: %s", [type(_association) for _association in associations])
# Finalize found associations
logger.debug("# associations before finalization: %d", len(associations))
finalized_asns = associations
if finalize:
logger.debug("Performing association finalization.")
try:
finalized_asns = rules.callback.reduce("finalize", associations)
except KeyError as exception:
logger.debug("Finalization failed for reason: %s", exception)
logger.info("Associations generated: %s", len(finalized_asns))
return finalized_asns
def generate_from_item(item, version_id, associations, rules, process_list):
"""
Either match or generate a new association.
Parameters
----------
item : dict
The item to match to existing associations
or generate new associations from
version_id : str or None
Version id to use with association creation.
If None, no versioning is used.
associations : [association, ...]
List of already existing associations.
If the item matches any of these, it will be added
to them.
rules : AssociationRegistry or None
List of rules to create new associations
process_list : ProcessList
The `ProcessList` from which the current item belongs to.
Returns
-------
tuple
A 3-tuple containing:
existing_asns : [association,...]
List of existing associations item belongs to.
Empty if none match
new_asns : [association,...]
List of new associations item creates. Empty if none match
process_list : [ProcessList, ...]
List of process events.
"""
# Setup the rules allowed to be examined.
if process_list.rules is None or len(process_list.rules) == 0:
allowed_rules = list(rules.values())
else:
allowed_rules = process_list.rules
# Check membership in existing associations.
existing_asns = []
reprocess_list = []
if process_list.work_over in (
ListCategory.BOTH,
ListCategory.EXISTING,
ListCategory.NONSCIENCE,
):
associations = [asn for asn in associations if type(asn) in allowed_rules]
existing_asns, reprocess_list = match_item(item, associations)
# Now see if this item will create new associations.
# By default, a item will not be allowed to create
# an association based on rules of existing associations.
reprocess = []
new_asns = []
if (
process_list.work_over
in (
ListCategory.BOTH,
ListCategory.RULES,
)
and rules is not None
):
ignore_asns = {type(asn) for asn in existing_asns}
new_asns, reprocess = rules.match(
item,
version_id=version_id,
allow=allowed_rules,
ignore=ignore_asns,
)
reprocess_list.extend(reprocess)
return existing_asns, new_asns, reprocess_list
def match_item(item, associations):
"""
Match item to a list of associations.
Parameters
----------
item : dict
The item to match to the associations.
associations : [association, ...]
List of already existing associations.
If the item matches any of these, it will be added
to them.
Returns
-------
(associations, process_list) : 2-tuple
A tuple containing:
associations : [association,...]
List of associations item belongs to. Empty if none match
process_list : [ProcessList, ...]
List of process events.
"""
item_associations = []
process_list = []
for asn in associations:
if asn in item_associations:
continue
matches, reprocess = asn.add(item)
process_list.extend(reprocess)
if matches:
item_associations.append(asn)
return item_associations, process_list