Source code for jwst.associations.asn_gather

"""Copy data that is listed in an association."""

import logging
import subprocess
from pathlib import Path

__all__ = ["asn_gather"]


# Configure logging
logger = logging.getLogger(__name__)
LOGLEVELS = [logging.WARNING, logging.INFO, logging.DEBUG]


[docs] def asn_gather( association, destination=None, exp_types=None, exclude_types=None, source_folder=None, shellcmd="rsync -urv --no-perms --chmod=ugo=rwX", ): """ Copy members of an association from one location to another. The association is copied into the destination, re-written such that the member list points to the new location of the members. Parameters ---------- association : str, pathlib.Path The association to gather. destination : str, pathlib.Path, or None The folder to place the association and its members. If None, the current working directory is used. exp_types : [str[,...]] or None List of exposure types to gather. If None, all are gathered. exclude_types : [str[,...]] or None List of exposure types to exclude. source_folder : str or None Folder where the members originate from. If None, the folder of the association is presumed. shellcmd : str The shell command to use to do the copying of the individual members. Returns ------- dest_asn : pathlib.Path The copied association. """ from jwst.associations.load_as_asn import LoadAsAssociation exclude_types = exclude_types if exclude_types is not None else [] source_asn_path = Path(association) if source_folder is None: source_folder = source_asn_path.parent else: source_folder = Path(source_folder) if destination is None: dest_folder = Path("./") else: dest_folder = Path(destination) # Create the associations source_asn = LoadAsAssociation.load(source_asn_path) dest_asn = LoadAsAssociation.load(source_asn_path) # Create the new association dest_asn["products"] = [] for src_product in source_asn["products"]: members = [ src_member for src_member in src_product["members"] if src_member["exptype"] not in exclude_types and (exp_types is None or src_member["exptype"] in exp_types) ] if members: product = {"name": src_product["name"], "members": members} dest_asn["products"].append(product) if not dest_asn["products"]: raise RuntimeError("No products could be gathered.") # Copy the members. shellcmd_args = shellcmd.split(" ") for product in dest_asn["products"]: for member in product["members"]: src_path = Path(member["expname"]) logger.info(f"*** Copying member {src_path.name}") if str(src_path.parent).startswith("."): src_path = source_folder / src_path dest_path = dest_folder / src_path.name process_args = shellcmd_args + [str(src_path), str(dest_path)] logger.debug(f"Shell command in use: {process_args}") result = subprocess.run( # noqa: S603 process_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True ) logger.debug(result.stdout.decode()) logger.info("...done") member["expname"] = dest_path.name # Save new association. dest_path = dest_folder / source_asn_path.name _, serialized = dest_asn.dump() logger.info(f"Copying the association file itself {dest_path}") with Path(dest_path).open("w") as fh: fh.write(serialized) # That's all folks return dest_path
def from_cmdline(args=None): """ Collect asn_gather arguments from the command line. Parameters ---------- args : [str[,...]] List of arguments to parse Returns ------- dict Dict of the arguments and their values. """ import argparse parser = argparse.ArgumentParser(description="Gather an association to a new location") parser.add_argument("association", help="Association to gather.") parser.add_argument("destination", help="Folder to copy the association to.") parser.add_argument( "-s", "--source", dest="source_folder", help="Folder where the members currently reside. " "Default is the folder where the association resides.", ) parser.add_argument( "-t", "--exp-types", default=None, dest="exp_types", action="append", help="Exposure types to gather. If not specified, all exposure types are used.", ) parser.add_argument( "-x", "--exclude-types", default=None, dest="exclude_types", action="append", help="Exposure types to exclude.", ) parser.add_argument( "-v", "--verbose", action="count", default=0, help="Increase verbosity. Specifying multiple times adds more output.", ) parser.add_argument( "-c", "--cmd", dest="shellcmd", default="rsync -urv --no-perms --chmod=ugo=rwX", help=( "Shell command to use to perform the copy. Specify as a single string. " 'Default: "%(default)s"' ), ) parsed = parser.parse_args(args) # Set output detail. level = LOGLEVELS[min(len(LOGLEVELS) - 1, parsed.verbose)] logger.setLevel(level) # That's all folks. gather_args = vars(parsed) del gather_args["verbose"] return gather_args