Source code for jwst.associations.generator.generate

import logging
from timeit import default_timer as timer

from jwst.associations.association import make_timestamp
from jwst.associations.lib.process_list import (
    ListCategory,
    ProcessList,
    ProcessQueueSorted,
    workover_filter,
)
from jwst.associations.pool import PoolRow
from jwst.lib.progress import Bar

# Configure logging
logger = logging.getLogger(__name__)

__all__ = ["generate"]


[docs] def generate(pool, rules, version_id=None, finalize=True): """ Generate associations in the pool according to the rules. Parameters ---------- pool : AssociationPool The pool to generate from. rules : AssociationRegistry The association rule set. version_id : None, True, or str The string to use to tag associations and products. If None, no tagging occurs. If True, use a timestamp If a string, the string. finalize : bool Run all rule methods marked as 'finalized'. Returns ------- associations : [Association[,...]] List of associations Notes ----- Refer to the :ref:`Association Generator <design-generator>` documentation for a full description. """ associations = [] if isinstance(version_id, bool): version_id = make_timestamp() process_queue = ProcessQueueSorted( [ProcessList(items=pool, rules=[rule for _, rule in rules.items()])] ) logger.debug("Initial process queue: %s", process_queue) for process_list in process_queue: logger.debug("** Working process list: %s", process_list) time_start = timer() total_mod_existing = 0 total_new = 0 total_reprocess = 0 with Bar( "Processing items", log_level=logger.getEffectiveLevel(), max=len(process_list.items) ) as bar: for item in process_list.items: item = PoolRow(item) existing_asns, new_asns, to_process = generate_from_item( item, version_id, associations, rules, process_list ) total_mod_existing += len(existing_asns) total_new += len(new_asns) associations.extend(new_asns) # If working on a process list EXISTING # remove any new `to_process` that is # also EXISTING. Prevent infinite loops. to_process_modified = [] for next_list in to_process: next_list = workover_filter(next_list, process_list.work_over) if next_list: to_process_modified.append(next_list) process_queue.extend(to_process_modified) total_reprocess += len(to_process_modified) bar.next() logger.info("Seconds to process: %.2f", timer() - time_start) logger.debug( "Existing associations modified: %d New associations created: %d", total_mod_existing, total_new, ) logger.debug("New process lists: %d", total_reprocess) logger.debug("Updated process queue: %s", process_queue) logger.debug("# associations: %d", len(associations)) logger.debug("Associations: %s", [type(_association) for _association in associations]) # Finalize found associations logger.debug("# associations before finalization: %d", len(associations)) finalized_asns = associations if finalize: logger.debug("Performing association finalization.") try: finalized_asns = rules.callback.reduce("finalize", associations) except KeyError as exception: logger.debug("Finalization failed for reason: %s", exception) logger.info("Associations generated: %s", len(finalized_asns)) return finalized_asns
def generate_from_item(item, version_id, associations, rules, process_list): """ Either match or generate a new association. Parameters ---------- item : dict The item to match to existing associations or generate new associations from version_id : str or None Version id to use with association creation. If None, no versioning is used. associations : [association, ...] List of already existing associations. If the item matches any of these, it will be added to them. rules : AssociationRegistry or None List of rules to create new associations process_list : ProcessList The `ProcessList` from which the current item belongs to. Returns ------- tuple A 3-tuple containing: existing_asns : [association,...] List of existing associations item belongs to. Empty if none match new_asns : [association,...] List of new associations item creates. Empty if none match process_list : [ProcessList, ...] List of process events. """ # Setup the rules allowed to be examined. if process_list.rules is None or len(process_list.rules) == 0: allowed_rules = list(rules.values()) else: allowed_rules = process_list.rules # Check membership in existing associations. existing_asns = [] reprocess_list = [] if process_list.work_over in ( ListCategory.BOTH, ListCategory.EXISTING, ListCategory.NONSCIENCE, ): associations = [asn for asn in associations if type(asn) in allowed_rules] existing_asns, reprocess_list = match_item(item, associations) # Now see if this item will create new associations. # By default, a item will not be allowed to create # an association based on rules of existing associations. reprocess = [] new_asns = [] if ( process_list.work_over in ( ListCategory.BOTH, ListCategory.RULES, ) and rules is not None ): ignore_asns = {type(asn) for asn in existing_asns} new_asns, reprocess = rules.match( item, version_id=version_id, allow=allowed_rules, ignore=ignore_asns, ) reprocess_list.extend(reprocess) return existing_asns, new_asns, reprocess_list def match_item(item, associations): """ Match item to a list of associations. Parameters ---------- item : dict The item to match to the associations. associations : [association, ...] List of already existing associations. If the item matches any of these, it will be added to them. Returns ------- (associations, process_list) : 2-tuple A tuple containing: associations : [association,...] List of associations item belongs to. Empty if none match process_list : [ProcessList, ...] List of process events. """ item_associations = [] process_list = [] for asn in associations: if asn in item_associations: continue matches, reprocess = asn.add(item) process_list.extend(reprocess) if matches: item_associations.append(asn) return item_associations, process_list