From c1158ee45f0de52d61fa26c55c369e361a139b78 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Thu, 20 Feb 2025 18:15:17 -0600 Subject: [PATCH 01/12] addressed non-blocking behavior for both create and update, many logging additions for activity tracing --- zstash/create.py | 8 ++++---- zstash/globus.py | 26 +++++++++++++++++++++----- zstash/hpss.py | 17 ++++++++++++++--- zstash/hpss_utils.py | 2 +- zstash/update.py | 24 ++++++++++++++++++++---- 5 files changed, 60 insertions(+), 17 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index d16287bb..014a322a 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -92,7 +92,7 @@ def create(): # Transfer to HPSS. Always keep a local copy. logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep) logger.debug(f"{ts_utc()}: calling globus_finalize()") globus_finalize(non_blocking=args.non_blocking) @@ -169,9 +169,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]: # Now that we're inside a subcommand, ignore the first two argvs # (zstash create) args: argparse.Namespace = parser.parse_args(sys.argv[2:]) - if args.hpss and args.hpss.lower() == "none": + if not args.hpss or args.hpss.lower() == "none": args.hpss = "none" - if args.non_blocking: args.keep = True if args.verbose: logger.setLevel(logging.DEBUG) @@ -179,7 +178,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]: # Copy configuration config.path = os.path.abspath(args.path) config.hpss = args.hpss - config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) + # config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) + config.maxsize = int(100 * 1024 * args.maxsize) cache: str if args.cache: cache = args.cache diff --git a/zstash/globus.py b/zstash/globus.py index 73a39fa0..d6dbe51f 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -157,6 +157,7 @@ def file_exists(name: str) -> bool: return True return False +gv_push = 0 # C901 'globus_transfer' is too complex (20) def globus_transfer( # noqa: C901 @@ -168,8 +169,10 @@ def globus_transfer( # noqa: C901 global transfer_data global task_id global archive_directory_listing + global gv_push logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") + logger.info(f"{ts_utc()}: DEBUG: non_blocking = {non_blocking}") if not transfer_client: globus_activate("globus://" + remote_ep) if not transfer_client: @@ -215,7 +218,7 @@ def globus_transfer( # noqa: C901 fail_on_quota_errors=True, ) transfer_data.add_item(src_path, dst_path) - transfer_data["label"] = subdir_label + " " + filename + transfer_data["label"] = label try: if task_id: task = transfer_client.get_task(task_id) @@ -225,12 +228,12 @@ def globus_transfer( # noqa: C901 # Presently, we do not, except inadvertantly (if status == PENDING) if prev_task_status == "ACTIVE": logger.info( - f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning." + f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE." ) return "ACTIVE" elif prev_task_status == "SUCCEEDED": logger.info( - f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing." + f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED." ) src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] @@ -243,7 +246,7 @@ def globus_transfer( # noqa: C901 ) else: logger.error( - f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing." + f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}." ) # DEBUG: review accumulated items in TransferData @@ -251,7 +254,8 @@ def globus_transfer( # noqa: C901 attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - print(f" source item: {item['source_path']}") + gv_push += 1 + print(f" (routine) PUSHING (#{gv_push}) STORED source item: {item['source_path']}", flush=True) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") @@ -263,6 +267,7 @@ def globus_transfer( # noqa: C901 f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}" ) + # Nullify the submitted transfer data structure so that a new one will be created on next call. transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": @@ -387,10 +392,21 @@ def globus_finalize(non_blocking: bool = False): global transfer_client global transfer_data global task_id + global gv_push last_task_id = None if transfer_data: + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:") + attribs = transfer_data.__dict__ + for item in attribs["data"]["DATA"]: + if item["DATA_TYPE"] == "transfer_item": + gv_push += 1 + print(f" (finalize) PUSHING ({gv_push}) source item: {item['source_path']}", flush=True) + + # SUBMIT new transfer here + logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") try: last_task = submit_transfer_with_checks(transfer_data) last_task_id = last_task.get("task_id") diff --git a/zstash/hpss.py b/zstash/hpss.py index 44bc0f13..931416c5 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -10,6 +10,8 @@ from .settings import get_db_filename, logger from .utils import run_command, ts_utc +prev_transfers = list() +curr_transfers = list() def hpss_transfer( hpss: str, @@ -19,6 +21,9 @@ def hpss_transfer( keep: bool = False, non_blocking: bool = False, ): + global prev_transfers + global curr_transfers + if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) if transfer_type == "put" and file_path != get_db_filename(cache): @@ -71,6 +76,7 @@ def hpss_transfer( endpoint = url.netloc url_path = url.path + curr_transfers.append(file_path) path, name = os.path.split(file_path) # Need to be in local directory for `hsi` to work @@ -113,10 +119,15 @@ def hpss_transfer( if transfer_type == "put": if not keep: if (scheme != "globus") or ( - globus_status == "SUCCEEDED" and not non_blocking + globus_status == "SUCCEEDED" ): - os.remove(file_path) - + # Note: This is intended to fulfill the default removal of successfully-transfered + # tar files when keep=False, irrespective of non-blocking status + logger.info(f"{ts_utc()}: DEBUG: deleting transfered files {prev_transfers}") + for src_path in prev_transfers: + os.remove(src_path) + prev_transfers = curr_transfers + curr_transfers = list() def hpss_put( hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 707f73a8..87325f4f 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -165,7 +165,7 @@ def add_files( # print(process.stdout) logger.info( - f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}" + f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname} [keep, non_blocking] = [{keep}, {non_blocking}]" ) hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) logger.info( diff --git a/zstash/update.py b/zstash/update.py index 56095897..22a3f813 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -38,7 +38,7 @@ def update(): else: failures = result - # Transfer to HPSS. Always keep a local copy. + # Transfer to HPSS. Always keep a local copy of the database. if config.hpss is not None: hpss = config.hpss else: @@ -83,6 +83,13 @@ def setup_update() -> Tuple[argparse.Namespace, str]: help="dry run, only list files to be updated in archive", action="store_true", ) + optional.add_argument( + "--maxsize", + type=float, + help="maximum size of tar archives (in GB, default 256)", + default=256, + ) + optional.add_argument( "--keep", help='if --hpss is not "none", keep the tar files in the local archive (cache) after uploading to the HPSS archive. Default is to delete the tar files. If --hpss=none, this flag has no effect.', @@ -107,8 +114,17 @@ def setup_update() -> Tuple[argparse.Namespace, str]: help="Hard copy symlinks. This is useful for preventing broken links. Note that a broken link will result in a failed update.", ) args: argparse.Namespace = parser.parse_args(sys.argv[2:]) - if args.hpss and args.hpss.lower() == "none": + + if not args.hpss or args.hpss.lower() == "none": args.hpss = "none" + args.keep - True + + # Copy configuration + # config.path = os.path.abspath(args.path) + config.hpss = args.hpss + # config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) + config.maxsize = int(100 * 1024 * args.maxsize) + cache: str if args.cache: cache = args.cache @@ -242,14 +258,14 @@ def update_database( # noqa: C901 try: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks + cur, con, itar, newfiles, cache, keep, args.follow_symlinks, non_blocking=args.non_blocking ) except FileNotFoundError: raise Exception("Archive update failed due to broken symlink.") else: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks + cur, con, itar, newfiles, cache, keep, args.follow_symlinks, non_blocking=args.non_blocking ) # Close database From 86fbbd582593bcd72187eb19b81cf6347a1fce3b Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Thu, 20 Feb 2025 18:19:33 -0600 Subject: [PATCH 02/12] Reset maxsize to production value --- zstash/create.py | 4 ++-- zstash/update.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index 014a322a..e69ecf54 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -178,8 +178,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]: # Copy configuration config.path = os.path.abspath(args.path) config.hpss = args.hpss - # config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) - config.maxsize = int(100 * 1024 * args.maxsize) + config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) + # config.maxsize = int(100 * 1024 * args.maxsize) # for test purposes cache: str if args.cache: cache = args.cache diff --git a/zstash/update.py b/zstash/update.py index 22a3f813..372c7790 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -122,8 +122,8 @@ def setup_update() -> Tuple[argparse.Namespace, str]: # Copy configuration # config.path = os.path.abspath(args.path) config.hpss = args.hpss - # config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) - config.maxsize = int(100 * 1024 * args.maxsize) + config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) + # config.maxsize = int(100 * 1024 * args.maxsize) # for test purposes cache: str if args.cache: From 09ac71d375a2d52bb28121a4238444f4215d22bc Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Fri, 21 Feb 2025 17:40:25 -0600 Subject: [PATCH 03/12] fixed wrong keep value and typo, renamed pushcount variable --- zstash/create.py | 2 +- zstash/globus.py | 14 +++++++------- zstash/update.py | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index e69ecf54..e5ca204b 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -92,7 +92,7 @@ def create(): # Transfer to HPSS. Always keep a local copy. logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") - hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep) + hpss_put(hpss, get_db_filename(cache), cache, keep=True) logger.debug(f"{ts_utc()}: calling globus_finalize()") globus_finalize(non_blocking=args.non_blocking) diff --git a/zstash/globus.py b/zstash/globus.py index d6dbe51f..87124fa8 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -157,7 +157,7 @@ def file_exists(name: str) -> bool: return True return False -gv_push = 0 +gv_tarfiles_pushed = 0 # C901 'globus_transfer' is too complex (20) def globus_transfer( # noqa: C901 @@ -169,7 +169,7 @@ def globus_transfer( # noqa: C901 global transfer_data global task_id global archive_directory_listing - global gv_push + global gv_tarfiles_pushed logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") logger.info(f"{ts_utc()}: DEBUG: non_blocking = {non_blocking}") @@ -254,8 +254,8 @@ def globus_transfer( # noqa: C901 attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - gv_push += 1 - print(f" (routine) PUSHING (#{gv_push}) STORED source item: {item['source_path']}", flush=True) + gv_tarfiles_pushed += 1 + print(f" (routine) PUSHING (#{gv_tarfiles_pushed}) STORED source item: {item['source_path']}", flush=True) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") @@ -392,7 +392,7 @@ def globus_finalize(non_blocking: bool = False): global transfer_client global transfer_data global task_id - global gv_push + global gv_tarfiles_pushed last_task_id = None @@ -402,8 +402,8 @@ def globus_finalize(non_blocking: bool = False): attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - gv_push += 1 - print(f" (finalize) PUSHING ({gv_push}) source item: {item['source_path']}", flush=True) + gv_tarfiles_pushed += 1 + print(f" (finalize) PUSHING ({gv_tarfiles_pushed}) source item: {item['source_path']}", flush=True) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") diff --git a/zstash/update.py b/zstash/update.py index 372c7790..01f21230 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -117,7 +117,7 @@ def setup_update() -> Tuple[argparse.Namespace, str]: if not args.hpss or args.hpss.lower() == "none": args.hpss = "none" - args.keep - True + args.keep = True # Copy configuration # config.path = os.path.abspath(args.path) From ab7e6f9e1a800bece1ab1c472817f9ad0ff6b375 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Fri, 21 Feb 2025 18:16:33 -0600 Subject: [PATCH 04/12] added parentheses for logic clarity --- zstash/update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zstash/update.py b/zstash/update.py index 01f21230..d62211dc 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -115,7 +115,7 @@ def setup_update() -> Tuple[argparse.Namespace, str]: ) args: argparse.Namespace = parser.parse_args(sys.argv[2:]) - if not args.hpss or args.hpss.lower() == "none": + if (not args.hpss) or (args.hpss.lower() == "none"): args.hpss = "none" args.keep = True From be08d8bc8632580885e0f1a98fd896cb3d9124e5 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Thu, 6 Mar 2025 14:24:10 -0600 Subject: [PATCH 05/12] adjust block wait polling interval --- zstash/globus.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zstash/globus.py b/zstash/globus.py index 87124fa8..928f6906 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -287,7 +287,7 @@ def globus_transfer( # noqa: C901 task_status = "UNKNOWN" if not non_blocking: task_status = globus_block_wait( - task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5 + task_id=task_id, wait_timeout=7200, polling_interval=900, max_retries=5 ) else: logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") @@ -316,7 +316,7 @@ def globus_block_wait( try: # Wait for the task to complete transfer_client.task_wait( - task_id, timeout=wait_timeout, polling_interval=10 + task_id, timeout=wait_timeout, polling_interval=900 ) except Exception as e: logger.error(f"Unexpected Exception: {e}") @@ -355,7 +355,7 @@ def globus_wait(task_id: str): with 20 second timeout limit. If the task is ACTIVE after time runs out 'task_wait' returns False, and True otherwise. """ - while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20): + while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20): pass """ The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). From 734ea5ca025f391921a1ba174585e5b01bf7d0a7 Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Thu, 6 Mar 2025 14:08:14 -0800 Subject: [PATCH 06/12] Clean up code --- tests3/README_TEST_BLOCKING | 4 ++-- tests3/snapshot.sh | 2 -- tests3/test_zstash_blocking.sh | 1 - zstash/create.py | 3 +-- zstash/globus.py | 15 ++++++++++++--- zstash/hpss.py | 14 ++++++++------ zstash/update.py | 19 ++++++++++++++++--- 7 files changed, 39 insertions(+), 19 deletions(-) diff --git a/tests3/README_TEST_BLOCKING b/tests3/README_TEST_BLOCKING index 95b81d0e..e367335b 100644 --- a/tests3/README_TEST_BLOCKING +++ b/tests3/README_TEST_BLOCKING @@ -39,7 +39,7 @@ working directory: [CWD]/dst_data/ - destination for Globus transfer of archives. - + [CWD]/tmp_cache/ - [Optional] alternative location for tar-file generation. @@ -86,7 +86,7 @@ to running Globus transfers. It is suggested that you run the test script with - test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1 + test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1 so that your command prompt returns and you can monitor progress with diff --git a/tests3/snapshot.sh b/tests3/snapshot.sh index 869812dc..7bbaef79 100755 --- a/tests3/snapshot.sh +++ b/tests3/snapshot.sh @@ -10,5 +10,3 @@ ls -l src_data/zstash echo "" echo "tmp_cache:" ls -l tmp_cache - - diff --git a/tests3/test_zstash_blocking.sh b/tests3/test_zstash_blocking.sh index 2d58622d..bae334a8 100755 --- a/tests3/test_zstash_blocking.sh +++ b/tests3/test_zstash_blocking.sh @@ -70,4 +70,3 @@ fi echo "Testing Completed" exit 0 - diff --git a/zstash/create.py b/zstash/create.py index e5ca204b..de1b646c 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -169,7 +169,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]: # Now that we're inside a subcommand, ignore the first two argvs # (zstash create) args: argparse.Namespace = parser.parse_args(sys.argv[2:]) - if not args.hpss or args.hpss.lower() == "none": + if (not args.hpss) or (args.hpss.lower() == "none"): args.hpss = "none" args.keep = True if args.verbose: @@ -179,7 +179,6 @@ def setup_create() -> Tuple[str, argparse.Namespace]: config.path = os.path.abspath(args.path) config.hpss = args.hpss config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) - # config.maxsize = int(100 * 1024 * args.maxsize) # for test purposes cache: str if args.cache: cache = args.cache diff --git a/zstash/globus.py b/zstash/globus.py index 928f6906..f5a52073 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -157,8 +157,11 @@ def file_exists(name: str) -> bool: return True return False + +# TODO: What does gv stand for? Globus something? Global variable? gv_tarfiles_pushed = 0 + # C901 'globus_transfer' is too complex (20) def globus_transfer( # noqa: C901 remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool @@ -172,7 +175,7 @@ def globus_transfer( # noqa: C901 global gv_tarfiles_pushed logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") - logger.info(f"{ts_utc()}: DEBUG: non_blocking = {non_blocking}") + logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}") if not transfer_client: globus_activate("globus://" + remote_ep) if not transfer_client: @@ -255,7 +258,10 @@ def globus_transfer( # noqa: C901 for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": gv_tarfiles_pushed += 1 - print(f" (routine) PUSHING (#{gv_tarfiles_pushed}) STORED source item: {item['source_path']}", flush=True) + print( + f" (routine) PUSHING (#{gv_tarfiles_pushed}) STORED source item: {item['source_path']}", + flush=True, + ) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") @@ -403,7 +409,10 @@ def globus_finalize(non_blocking: bool = False): for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": gv_tarfiles_pushed += 1 - print(f" (finalize) PUSHING ({gv_tarfiles_pushed}) source item: {item['source_path']}", flush=True) + print( + f" (finalize) PUSHING ({gv_tarfiles_pushed}) source item: {item['source_path']}", + flush=True, + ) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") diff --git a/zstash/hpss.py b/zstash/hpss.py index 931416c5..1b481700 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -10,8 +10,9 @@ from .settings import get_db_filename, logger from .utils import run_command, ts_utc -prev_transfers = list() -curr_transfers = list() +prev_transfers: List[str] = list() +curr_transfers: List[str] = list() + def hpss_transfer( hpss: str, @@ -118,17 +119,18 @@ def hpss_transfer( if transfer_type == "put": if not keep: - if (scheme != "globus") or ( - globus_status == "SUCCEEDED" - ): + if (scheme != "globus") or (globus_status == "SUCCEEDED"): # Note: This is intended to fulfill the default removal of successfully-transfered # tar files when keep=False, irrespective of non-blocking status - logger.info(f"{ts_utc()}: DEBUG: deleting transfered files {prev_transfers}") + logger.debug( + f"{ts_utc()}: deleting transfered files {prev_transfers}" + ) for src_path in prev_transfers: os.remove(src_path) prev_transfers = curr_transfers curr_transfers = list() + def hpss_put( hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False ): diff --git a/zstash/update.py b/zstash/update.py index d62211dc..0be520ab 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -123,7 +123,6 @@ def setup_update() -> Tuple[argparse.Namespace, str]: # config.path = os.path.abspath(args.path) config.hpss = args.hpss config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) - # config.maxsize = int(100 * 1024 * args.maxsize) # for test purposes cache: str if args.cache: @@ -258,14 +257,28 @@ def update_database( # noqa: C901 try: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks, non_blocking=args.non_blocking + cur, + con, + itar, + newfiles, + cache, + keep, + args.follow_symlinks, + non_blocking=args.non_blocking, ) except FileNotFoundError: raise Exception("Archive update failed due to broken symlink.") else: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks, non_blocking=args.non_blocking + cur, + con, + itar, + newfiles, + cache, + keep, + args.follow_symlinks, + non_blocking=args.non_blocking, ) # Close database From 49fd87ba767b79a2630e29ff9b3d5a5dc5859f4c Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Fri, 7 Mar 2025 13:23:42 -0800 Subject: [PATCH 07/12] Add update tests --- tests/scripts_unit_tests/README.md | 4 + .../test_update_non_empty_hpss.bash | 92 +++++++++++++++++++ tests/test_update.py | 69 ++++++++++---- zstash/hpss.py | 15 ++- 4 files changed, 160 insertions(+), 20 deletions(-) create mode 100644 tests/scripts_unit_tests/README.md create mode 100755 tests/scripts_unit_tests/test_update_non_empty_hpss.bash diff --git a/tests/scripts_unit_tests/README.md b/tests/scripts_unit_tests/README.md new file mode 100644 index 00000000..ca41e4d6 --- /dev/null +++ b/tests/scripts_unit_tests/README.md @@ -0,0 +1,4 @@ +The unit tests, in script form. + +The unit tests are hard to follow via Python, +so it may be easier to run and debug the logic using these scripts. diff --git a/tests/scripts_unit_tests/test_update_non_empty_hpss.bash b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash new file mode 100755 index 00000000..26b039dd --- /dev/null +++ b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash @@ -0,0 +1,92 @@ +#!/bin/bash + +hpss_path=zstash_test # Set via `HPSS_ARCHIVE = "zstash_test"` +cache=zstash # Set via `self.cache = "zstash"` + +# base.setupDirs ############################################################## +use_hpss=true +test_dir=zstash_test + +# Create files and directories +echo "Creating files." +mkdir -p ${test_dir} +mkdir -p ${test_dir}/empty_dir +mkdir -p ${test_dir}/dir + +echo "file0 stuff" > ${test_dir}/file0.txt +echo "" > ${test_dir}/file_empty.txt +echo "file1 stuff" > ${test_dir}/dir/file1.txt + +# Symbolic (soft) link (points to a file name which points to an inode) +# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt +# The python `os.symlink` call omits the first `test_dir` +# because it simply looks in the same directory for the file to link to. +ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt +# Bad symbolic (soft) link (points to a file name which points to an inode) +ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt +# Hard link (points to an inode directly) +ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt + +# base.create ################################################################# +echo "Adding files to HPSS" +zstash create --hpss=${hpss_path} ${test_dir} +# Archives 000000.tar +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, index.db + +# base.add_files ############################################################## +echo "Testing update with an actual change" +mkdir -p ${test_dir}/dir2 +echo "file2 stuff" > ${test_dir}/dir2/file2.txt +echo "file1 stuff with changes" > ${test_dir}/dir/file1.txt +cd ${test_dir} +zstash update -v --hpss=${hpss_path} +# Archives 000001.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, index.db + +echo "Adding more files to the HPSS archive." +echo "file3 stuff" > ${test_dir}/file3.txt +cd ${test_dir} +zstash update --hpss=${hpss_path} +# Archives 000002.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, index.db + +echo "file4 stuff" > ${test_dir}/file4.txt +cd ${test_dir} +zstash update --hpss=${hpss_path} +# Archives 000003.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, index.db + +echo "file5 stuff" > ${test_dir}/file5.txt +cd ${test_dir} +zstash update --hpss=${hpss_path} +# Archives 000004.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, 000004.tar, index.db + +# back in test_update.helperUpdateNonEmpty #################################### +echo "Cache check actually performed in the unit test:" +ls -l ${test_dir}/${cache} # just index.db +# Strangely, showing ['index.db', '000001.tar', '000000.tar', '000002.tar', '000003.tar', '000004.tar'] on the automated test... + +# base.tearDown ############################################################### +echo "Removing test files, both locally and at the HPSS repo" +rm -rf ${test_dir} +hsi rm -R ${hpss_path} diff --git a/tests/test_update.py b/tests/test_update.py index 790e51c0..0b395bcb 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -141,33 +141,64 @@ def helperUpdateCache(self, test_name, hpss_path, zstash_path=ZSTASH_PATH): ) self.stop(error_message) - def testUpdate(self): - self.helperUpdate("testUpdate", "none") + def helperUpdateNonEmpty(self, test_name, hpss_path, zstash_path=ZSTASH_PATH): + """ + Test `zstash update`. + """ + self.hpss_path = hpss_path + use_hpss = self.setupDirs(test_name) + self.create(use_hpss, zstash_path) + self.add_files(use_hpss, zstash_path) + files = os.listdir("{}/{}".format(self.test_dir, self.cache)) + if use_hpss: + expected_files = ["index.db"] + else: + expected_files = [ + "index.db", + "000003.tar", + "000004.tar", + "000000.tar", + "000001.tar", + "000002.tar", + ] + if not compare(files, expected_files): + error_message = f"The zstash cache {self.test_dir}/{self.cache} does not contain expected files.\nIt has: {files}" + self.stop(error_message) - def testUpdateHPSS(self): - self.conditional_hpss_skip() - self.helperUpdate("testUpdateHPSS", HPSS_ARCHIVE) + # def testUpdate(self): + # self.helperUpdate("testUpdate", "none") - def testUpdateDryRun(self): - self.helperUpdateDryRun("testUpdateDryRun", "none") + # def testUpdateHPSS(self): + # self.conditional_hpss_skip() + # self.helperUpdate("testUpdateHPSS", HPSS_ARCHIVE) - def testUpdateDryRunHPSS(self): - self.conditional_hpss_skip() - self.helperUpdateDryRun("testUpdateDryRunHPSS", HPSS_ARCHIVE) + # def testUpdateDryRun(self): + # self.helperUpdateDryRun("testUpdateDryRun", "none") - def testUpdateKeep(self): - self.helperUpdateKeep("testUpdateKeep", "none") + # def testUpdateDryRunHPSS(self): + # self.conditional_hpss_skip() + # self.helperUpdateDryRun("testUpdateDryRunHPSS", HPSS_ARCHIVE) - def testUpdateKeepHPSS(self): - self.conditional_hpss_skip() - self.helperUpdateKeep("testUpdateKeepHPSS", HPSS_ARCHIVE) + # def testUpdateKeep(self): + # self.helperUpdateKeep("testUpdateKeep", "none") + + # def testUpdateKeepHPSS(self): + # self.conditional_hpss_skip() + # self.helperUpdateKeep("testUpdateKeepHPSS", HPSS_ARCHIVE) + + # def testUpdateCache(self): + # self.helperUpdateCache("testUpdateCache", "none") + + # def testUpdateCacheHPSS(self): + # self.conditional_hpss_skip() + # self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE) - def testUpdateCache(self): - self.helperUpdateCache("testUpdateCache", "none") + def testUpdateNonEmpty(self): + self.helperUpdateNonEmpty("testUpdateNonEmpty", "none") - def testUpdateCacheHPSS(self): + def testUpdateNonEmptyHPSS(self): self.conditional_hpss_skip() - self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE) + self.helperUpdateNonEmpty("testUpdateNonEmptyHPSS", HPSS_ARCHIVE) if __name__ == "__main__": diff --git a/zstash/hpss.py b/zstash/hpss.py index 1b481700..d2870422 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -25,6 +25,13 @@ def hpss_transfer( global prev_transfers global curr_transfers + logger.info( + f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {prev_transfers}" + ) + logger.info( + f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {curr_transfers}" + ) + if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) if transfer_type == "put" and file_path != get_db_filename(cache): @@ -78,6 +85,9 @@ def hpss_transfer( url_path = url.path curr_transfers.append(file_path) + logger.info( + f"{ts_utc()}: curr_transfers has been appended to, is now {curr_transfers}" + ) path, name = os.path.split(file_path) # Need to be in local directory for `hsi` to work @@ -122,13 +132,16 @@ def hpss_transfer( if (scheme != "globus") or (globus_status == "SUCCEEDED"): # Note: This is intended to fulfill the default removal of successfully-transfered # tar files when keep=False, irrespective of non-blocking status - logger.debug( + logger.info( f"{ts_utc()}: deleting transfered files {prev_transfers}" ) for src_path in prev_transfers: os.remove(src_path) prev_transfers = curr_transfers curr_transfers = list() + logger.info( + f"{ts_utc()}: prev_transfers has been set to {prev_transfers}" + ) def hpss_put( From 8050fb59df604f804ee13d0cc4240df568c3bd89 Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Mon, 10 Mar 2025 15:19:55 -0700 Subject: [PATCH 08/12] Update tests passing --- .../test_update_non_empty_hpss.bash | 1 - tests/test_update.py | 40 +++++++++---------- zstash/create.py | 2 +- zstash/hpss.py | 10 ++++- zstash/update.py | 2 +- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/tests/scripts_unit_tests/test_update_non_empty_hpss.bash b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash index 26b039dd..c1d51354 100755 --- a/tests/scripts_unit_tests/test_update_non_empty_hpss.bash +++ b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash @@ -84,7 +84,6 @@ hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, 000004. # back in test_update.helperUpdateNonEmpty #################################### echo "Cache check actually performed in the unit test:" ls -l ${test_dir}/${cache} # just index.db -# Strangely, showing ['index.db', '000001.tar', '000000.tar', '000002.tar', '000003.tar', '000004.tar'] on the automated test... # base.tearDown ############################################################### echo "Removing test files, both locally and at the HPSS repo" diff --git a/tests/test_update.py b/tests/test_update.py index 0b395bcb..4d8507e7 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -165,33 +165,33 @@ def helperUpdateNonEmpty(self, test_name, hpss_path, zstash_path=ZSTASH_PATH): error_message = f"The zstash cache {self.test_dir}/{self.cache} does not contain expected files.\nIt has: {files}" self.stop(error_message) - # def testUpdate(self): - # self.helperUpdate("testUpdate", "none") + def testUpdate(self): + self.helperUpdate("testUpdate", "none") - # def testUpdateHPSS(self): - # self.conditional_hpss_skip() - # self.helperUpdate("testUpdateHPSS", HPSS_ARCHIVE) + def testUpdateHPSS(self): + self.conditional_hpss_skip() + self.helperUpdate("testUpdateHPSS", HPSS_ARCHIVE) - # def testUpdateDryRun(self): - # self.helperUpdateDryRun("testUpdateDryRun", "none") + def testUpdateDryRun(self): + self.helperUpdateDryRun("testUpdateDryRun", "none") - # def testUpdateDryRunHPSS(self): - # self.conditional_hpss_skip() - # self.helperUpdateDryRun("testUpdateDryRunHPSS", HPSS_ARCHIVE) + def testUpdateDryRunHPSS(self): + self.conditional_hpss_skip() + self.helperUpdateDryRun("testUpdateDryRunHPSS", HPSS_ARCHIVE) - # def testUpdateKeep(self): - # self.helperUpdateKeep("testUpdateKeep", "none") + def testUpdateKeep(self): + self.helperUpdateKeep("testUpdateKeep", "none") - # def testUpdateKeepHPSS(self): - # self.conditional_hpss_skip() - # self.helperUpdateKeep("testUpdateKeepHPSS", HPSS_ARCHIVE) + def testUpdateKeepHPSS(self): + self.conditional_hpss_skip() + self.helperUpdateKeep("testUpdateKeepHPSS", HPSS_ARCHIVE) - # def testUpdateCache(self): - # self.helperUpdateCache("testUpdateCache", "none") + def testUpdateCache(self): + self.helperUpdateCache("testUpdateCache", "none") - # def testUpdateCacheHPSS(self): - # self.conditional_hpss_skip() - # self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE) + def testUpdateCacheHPSS(self): + self.conditional_hpss_skip() + self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE) def testUpdateNonEmpty(self): self.helperUpdateNonEmpty("testUpdateNonEmpty", "none") diff --git a/zstash/create.py b/zstash/create.py index de1b646c..c544896c 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -92,7 +92,7 @@ def create(): # Transfer to HPSS. Always keep a local copy. logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) logger.debug(f"{ts_utc()}: calling globus_finalize()") globus_finalize(non_blocking=args.non_blocking) diff --git a/zstash/hpss.py b/zstash/hpss.py index d2870422..20c5d96b 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -21,6 +21,7 @@ def hpss_transfer( cache: str, keep: bool = False, non_blocking: bool = False, + is_index: bool = False, ): global prev_transfers global curr_transfers @@ -145,12 +146,17 @@ def hpss_transfer( def hpss_put( - hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False + hpss: str, + file_path: str, + cache: str, + keep: bool = True, + non_blocking: bool = False, + is_index=False, ): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking) + hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking, is_index) def hpss_get(hpss: str, file_path: str, cache: str): diff --git a/zstash/update.py b/zstash/update.py index 0be520ab..9ab565b8 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -43,7 +43,7 @@ def update(): hpss = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) globus_finalize(non_blocking=args.non_blocking) From f4a661c5ad386e5cf7f9a90d507b1de74105c58d Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Mon, 10 Mar 2025 16:50:47 -0700 Subject: [PATCH 09/12] Globus test passing --- tests/scripts_unit_tests/test_ls_globus.bash | 47 +++++++++++++++++++ .../test_update_non_empty_hpss.bash | 1 - zstash/globus.py | 8 +++- 3 files changed, 53 insertions(+), 3 deletions(-) create mode 100755 tests/scripts_unit_tests/test_ls_globus.bash diff --git a/tests/scripts_unit_tests/test_ls_globus.bash b/tests/scripts_unit_tests/test_ls_globus.bash new file mode 100755 index 00000000..7a0ee44f --- /dev/null +++ b/tests/scripts_unit_tests/test_ls_globus.bash @@ -0,0 +1,47 @@ +#!/bin/bash + +hpss_globus_endpoint="6c54cade-bde5-45c1-bdea-f4bd71dba2cc" +hpss_path="globus://${hpss_globus_endpoint}/~/zstash_test/" +cache=zstash # Set via `self.cache = "zstash"` + +# base.setupDirs ############################################################## +use_hpss=true +test_dir=zstash_test + +# Create files and directories +echo "Creating files." +mkdir -p ${test_dir} +mkdir -p ${test_dir}/empty_dir +mkdir -p ${test_dir}/dir + +echo "file0 stuff" > ${test_dir}/file0.txt +echo "" > ${test_dir}/file_empty.txt +echo "file1 stuff" > ${test_dir}/dir/file1.txt + +# Symbolic (soft) link (points to a file name which points to an inode) +# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt +# The python `os.symlink` call omits the first `test_dir` +# because it simply looks in the same directory for the file to link to. +ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt +# Bad symbolic (soft) link (points to a file name which points to an inode) +ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt +# Hard link (points to an inode directly) +ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt + +# base.create ################################################################# +echo "Adding files to HPSS" +zstash create --hpss=${hpss_path} ${test_dir} +# Archives 000000.tar +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, index.db + +# back in test_globus.helperLsGlobus ########################################## +cd ${test_dir} +zstash ls --hpss=${hpss_path} +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} +echo "HPSS:" +hsi ls -l ${hpss_path} diff --git a/tests/scripts_unit_tests/test_update_non_empty_hpss.bash b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash index c1d51354..4c213d4e 100755 --- a/tests/scripts_unit_tests/test_update_non_empty_hpss.bash +++ b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash @@ -4,7 +4,6 @@ hpss_path=zstash_test # Set via `HPSS_ARCHIVE = "zstash_test"` cache=zstash # Set via `self.cache = "zstash"` # base.setupDirs ############################################################## -use_hpss=true test_dir=zstash_test # Create files and directories diff --git a/zstash/globus.py b/zstash/globus.py index f5a52073..b5a78d1f 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -293,7 +293,7 @@ def globus_transfer( # noqa: C901 task_status = "UNKNOWN" if not non_blocking: task_status = globus_block_wait( - task_id=task_id, wait_timeout=7200, polling_interval=900, max_retries=5 + task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5 ) else: logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") @@ -321,9 +321,13 @@ def globus_block_wait( while retry_count < max_retries: try: # Wait for the task to complete + logger.info( + f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}" + ) transfer_client.task_wait( - task_id, timeout=wait_timeout, polling_interval=900 + task_id, timeout=wait_timeout, polling_interval=10 ) + logger.info(f"{ts_utc()}: done with wait") except Exception as e: logger.error(f"Unexpected Exception: {e}") else: From 33ef84806fe74a727c6e80003d1dd31d49ce7db4 Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Mon, 10 Mar 2025 17:11:36 -0700 Subject: [PATCH 10/12] Remove unused variable --- tests/scripts_unit_tests/test_ls_globus.bash | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/scripts_unit_tests/test_ls_globus.bash b/tests/scripts_unit_tests/test_ls_globus.bash index 7a0ee44f..b1677a80 100755 --- a/tests/scripts_unit_tests/test_ls_globus.bash +++ b/tests/scripts_unit_tests/test_ls_globus.bash @@ -5,7 +5,6 @@ hpss_path="globus://${hpss_globus_endpoint}/~/zstash_test/" cache=zstash # Set via `self.cache = "zstash"` # base.setupDirs ############################################################## -use_hpss=true test_dir=zstash_test # Create files and directories From 59fa4428555764ef9c7af61ad56f4d28c9cbd02a Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Mon, 10 Mar 2025 18:12:34 -0700 Subject: [PATCH 11/12] Fix logging statements --- zstash/hpss.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/zstash/hpss.py b/zstash/hpss.py index 20c5d96b..24603388 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -29,9 +29,9 @@ def hpss_transfer( logger.info( f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {prev_transfers}" ) - logger.info( - f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {curr_transfers}" - ) + # logger.debug( + # f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {curr_transfers}" + # ) if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) @@ -86,9 +86,9 @@ def hpss_transfer( url_path = url.path curr_transfers.append(file_path) - logger.info( - f"{ts_utc()}: curr_transfers has been appended to, is now {curr_transfers}" - ) + # logger.debug( + # f"{ts_utc()}: curr_transfers has been appended to, is now {curr_transfers}" + # ) path, name = os.path.split(file_path) # Need to be in local directory for `hsi` to work @@ -133,7 +133,7 @@ def hpss_transfer( if (scheme != "globus") or (globus_status == "SUCCEEDED"): # Note: This is intended to fulfill the default removal of successfully-transfered # tar files when keep=False, irrespective of non-blocking status - logger.info( + logger.debug( f"{ts_utc()}: deleting transfered files {prev_transfers}" ) for src_path in prev_transfers: From 78476eb98762251d8b06ec2774e680dc4138a4d4 Mon Sep 17 00:00:00 2001 From: Ryan Forsyth Date: Tue, 11 Mar 2025 10:15:55 -0700 Subject: [PATCH 12/12] Rename gv to global_variable --- zstash/globus.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/zstash/globus.py b/zstash/globus.py index b5a78d1f..84c12cdf 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -158,8 +158,7 @@ def file_exists(name: str) -> bool: return False -# TODO: What does gv stand for? Globus something? Global variable? -gv_tarfiles_pushed = 0 +global_variable_tarfiles_pushed = 0 # C901 'globus_transfer' is too complex (20) @@ -172,7 +171,7 @@ def globus_transfer( # noqa: C901 global transfer_data global task_id global archive_directory_listing - global gv_tarfiles_pushed + global global_variable_tarfiles_pushed logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}") @@ -257,9 +256,9 @@ def globus_transfer( # noqa: C901 attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - gv_tarfiles_pushed += 1 + global_variable_tarfiles_pushed += 1 print( - f" (routine) PUSHING (#{gv_tarfiles_pushed}) STORED source item: {item['source_path']}", + f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}", flush=True, ) @@ -402,7 +401,7 @@ def globus_finalize(non_blocking: bool = False): global transfer_client global transfer_data global task_id - global gv_tarfiles_pushed + global global_variable_tarfiles_pushed last_task_id = None @@ -412,9 +411,9 @@ def globus_finalize(non_blocking: bool = False): attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - gv_tarfiles_pushed += 1 + global_variable_tarfiles_pushed += 1 print( - f" (finalize) PUSHING ({gv_tarfiles_pushed}) source item: {item['source_path']}", + f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}", flush=True, )