diff --git a/tests/scripts_unit_tests/README.md b/tests/scripts_unit_tests/README.md new file mode 100644 index 00000000..ca41e4d6 --- /dev/null +++ b/tests/scripts_unit_tests/README.md @@ -0,0 +1,4 @@ +The unit tests, in script form. + +The unit tests are hard to follow via Python, +so it may be easier to run and debug the logic using these scripts. diff --git a/tests/scripts_unit_tests/test_ls_globus.bash b/tests/scripts_unit_tests/test_ls_globus.bash new file mode 100755 index 00000000..b1677a80 --- /dev/null +++ b/tests/scripts_unit_tests/test_ls_globus.bash @@ -0,0 +1,46 @@ +#!/bin/bash + +hpss_globus_endpoint="6c54cade-bde5-45c1-bdea-f4bd71dba2cc" +hpss_path="globus://${hpss_globus_endpoint}/~/zstash_test/" +cache=zstash # Set via `self.cache = "zstash"` + +# base.setupDirs ############################################################## +test_dir=zstash_test + +# Create files and directories +echo "Creating files." +mkdir -p ${test_dir} +mkdir -p ${test_dir}/empty_dir +mkdir -p ${test_dir}/dir + +echo "file0 stuff" > ${test_dir}/file0.txt +echo "" > ${test_dir}/file_empty.txt +echo "file1 stuff" > ${test_dir}/dir/file1.txt + +# Symbolic (soft) link (points to a file name which points to an inode) +# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt +# The python `os.symlink` call omits the first `test_dir` +# because it simply looks in the same directory for the file to link to. +ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt +# Bad symbolic (soft) link (points to a file name which points to an inode) +ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt +# Hard link (points to an inode directly) +ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt + +# base.create ################################################################# +echo "Adding files to HPSS" +zstash create --hpss=${hpss_path} ${test_dir} +# Archives 000000.tar +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, index.db + +# back in test_globus.helperLsGlobus ########################################## +cd ${test_dir} +zstash ls --hpss=${hpss_path} +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} +echo "HPSS:" +hsi ls -l ${hpss_path} diff --git a/tests/scripts_unit_tests/test_update_non_empty_hpss.bash b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash new file mode 100755 index 00000000..4c213d4e --- /dev/null +++ b/tests/scripts_unit_tests/test_update_non_empty_hpss.bash @@ -0,0 +1,90 @@ +#!/bin/bash + +hpss_path=zstash_test # Set via `HPSS_ARCHIVE = "zstash_test"` +cache=zstash # Set via `self.cache = "zstash"` + +# base.setupDirs ############################################################## +test_dir=zstash_test + +# Create files and directories +echo "Creating files." +mkdir -p ${test_dir} +mkdir -p ${test_dir}/empty_dir +mkdir -p ${test_dir}/dir + +echo "file0 stuff" > ${test_dir}/file0.txt +echo "" > ${test_dir}/file_empty.txt +echo "file1 stuff" > ${test_dir}/dir/file1.txt + +# Symbolic (soft) link (points to a file name which points to an inode) +# ${test_dir}/file_0_soft.txt points to ${test_dir}/file0.txt +# The python `os.symlink` call omits the first `test_dir` +# because it simply looks in the same directory for the file to link to. +ln -s ${test_dir}/file0.txt ${test_dir}/file_0_soft.txt +# Bad symbolic (soft) link (points to a file name which points to an inode) +ln -s ${test_dir}/file0_that_doesnt_exist.txt ${test_dir}/file0_soft_bad.txt +# Hard link (points to an inode directly) +ln -s ${test_dir}/file0.txt ${test_dir}/file0_hard.txt + +# base.create ################################################################# +echo "Adding files to HPSS" +zstash create --hpss=${hpss_path} ${test_dir} +# Archives 000000.tar +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, index.db + +# base.add_files ############################################################## +echo "Testing update with an actual change" +mkdir -p ${test_dir}/dir2 +echo "file2 stuff" > ${test_dir}/dir2/file2.txt +echo "file1 stuff with changes" > ${test_dir}/dir/file1.txt +cd ${test_dir} +zstash update -v --hpss=${hpss_path} +# Archives 000001.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, index.db + +echo "Adding more files to the HPSS archive." +echo "file3 stuff" > ${test_dir}/file3.txt +cd ${test_dir} +zstash update --hpss=${hpss_path} +# Archives 000002.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, index.db + +echo "file4 stuff" > ${test_dir}/file4.txt +cd ${test_dir} +zstash update --hpss=${hpss_path} +# Archives 000003.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, index.db + +echo "file5 stuff" > ${test_dir}/file5.txt +cd ${test_dir} +zstash update --hpss=${hpss_path} +# Archives 000004.tar +cd .. +echo "Cache:" +ls -l ${test_dir}/${cache} # just index.db +echo "HPSS:" +hsi ls -l ${hpss_path} # 000000.tar, 000001.tar, 000002.tar, 000003.tar, 000004.tar, index.db + +# back in test_update.helperUpdateNonEmpty #################################### +echo "Cache check actually performed in the unit test:" +ls -l ${test_dir}/${cache} # just index.db + +# base.tearDown ############################################################### +echo "Removing test files, both locally and at the HPSS repo" +rm -rf ${test_dir} +hsi rm -R ${hpss_path} diff --git a/tests/test_update.py b/tests/test_update.py index 790e51c0..4d8507e7 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -141,6 +141,30 @@ def helperUpdateCache(self, test_name, hpss_path, zstash_path=ZSTASH_PATH): ) self.stop(error_message) + def helperUpdateNonEmpty(self, test_name, hpss_path, zstash_path=ZSTASH_PATH): + """ + Test `zstash update`. + """ + self.hpss_path = hpss_path + use_hpss = self.setupDirs(test_name) + self.create(use_hpss, zstash_path) + self.add_files(use_hpss, zstash_path) + files = os.listdir("{}/{}".format(self.test_dir, self.cache)) + if use_hpss: + expected_files = ["index.db"] + else: + expected_files = [ + "index.db", + "000003.tar", + "000004.tar", + "000000.tar", + "000001.tar", + "000002.tar", + ] + if not compare(files, expected_files): + error_message = f"The zstash cache {self.test_dir}/{self.cache} does not contain expected files.\nIt has: {files}" + self.stop(error_message) + def testUpdate(self): self.helperUpdate("testUpdate", "none") @@ -169,6 +193,13 @@ def testUpdateCacheHPSS(self): self.conditional_hpss_skip() self.helperUpdateCache("testUpdateCacheHPSS", HPSS_ARCHIVE) + def testUpdateNonEmpty(self): + self.helperUpdateNonEmpty("testUpdateNonEmpty", "none") + + def testUpdateNonEmptyHPSS(self): + self.conditional_hpss_skip() + self.helperUpdateNonEmpty("testUpdateNonEmptyHPSS", HPSS_ARCHIVE) + if __name__ == "__main__": unittest.main() diff --git a/tests3/README_TEST_BLOCKING b/tests3/README_TEST_BLOCKING index 95b81d0e..e367335b 100644 --- a/tests3/README_TEST_BLOCKING +++ b/tests3/README_TEST_BLOCKING @@ -39,7 +39,7 @@ working directory: [CWD]/dst_data/ - destination for Globus transfer of archives. - + [CWD]/tmp_cache/ - [Optional] alternative location for tar-file generation. @@ -86,7 +86,7 @@ to running Globus transfers. It is suggested that you run the test script with - test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1 + test_zstash_blocking.sh (BLOCKING|NON_BLOCKING) > your_logfile 2>&1 so that your command prompt returns and you can monitor progress with diff --git a/tests3/snapshot.sh b/tests3/snapshot.sh index 869812dc..7bbaef79 100755 --- a/tests3/snapshot.sh +++ b/tests3/snapshot.sh @@ -10,5 +10,3 @@ ls -l src_data/zstash echo "" echo "tmp_cache:" ls -l tmp_cache - - diff --git a/tests3/test_zstash_blocking.sh b/tests3/test_zstash_blocking.sh index 2d58622d..bae334a8 100755 --- a/tests3/test_zstash_blocking.sh +++ b/tests3/test_zstash_blocking.sh @@ -70,4 +70,3 @@ fi echo "Testing Completed" exit 0 - diff --git a/zstash/create.py b/zstash/create.py index d16287bb..c544896c 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -92,7 +92,7 @@ def create(): # Transfer to HPSS. Always keep a local copy. logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) logger.debug(f"{ts_utc()}: calling globus_finalize()") globus_finalize(non_blocking=args.non_blocking) @@ -169,9 +169,8 @@ def setup_create() -> Tuple[str, argparse.Namespace]: # Now that we're inside a subcommand, ignore the first two argvs # (zstash create) args: argparse.Namespace = parser.parse_args(sys.argv[2:]) - if args.hpss and args.hpss.lower() == "none": + if (not args.hpss) or (args.hpss.lower() == "none"): args.hpss = "none" - if args.non_blocking: args.keep = True if args.verbose: logger.setLevel(logging.DEBUG) diff --git a/zstash/globus.py b/zstash/globus.py index 73a39fa0..84c12cdf 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -158,6 +158,9 @@ def file_exists(name: str) -> bool: return False +global_variable_tarfiles_pushed = 0 + + # C901 'globus_transfer' is too complex (20) def globus_transfer( # noqa: C901 remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool @@ -168,8 +171,10 @@ def globus_transfer( # noqa: C901 global transfer_data global task_id global archive_directory_listing + global global_variable_tarfiles_pushed logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") + logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}") if not transfer_client: globus_activate("globus://" + remote_ep) if not transfer_client: @@ -215,7 +220,7 @@ def globus_transfer( # noqa: C901 fail_on_quota_errors=True, ) transfer_data.add_item(src_path, dst_path) - transfer_data["label"] = subdir_label + " " + filename + transfer_data["label"] = label try: if task_id: task = transfer_client.get_task(task_id) @@ -225,12 +230,12 @@ def globus_transfer( # noqa: C901 # Presently, we do not, except inadvertantly (if status == PENDING) if prev_task_status == "ACTIVE": logger.info( - f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning." + f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE." ) return "ACTIVE" elif prev_task_status == "SUCCEEDED": logger.info( - f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing." + f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED." ) src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] @@ -243,7 +248,7 @@ def globus_transfer( # noqa: C901 ) else: logger.error( - f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing." + f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}." ) # DEBUG: review accumulated items in TransferData @@ -251,7 +256,11 @@ def globus_transfer( # noqa: C901 attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - print(f" source item: {item['source_path']}") + global_variable_tarfiles_pushed += 1 + print( + f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}", + flush=True, + ) # SUBMIT new transfer here logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") @@ -263,6 +272,7 @@ def globus_transfer( # noqa: C901 f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}" ) + # Nullify the submitted transfer data structure so that a new one will be created on next call. transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": @@ -310,9 +320,13 @@ def globus_block_wait( while retry_count < max_retries: try: # Wait for the task to complete + logger.info( + f"{ts_utc()}: on task_wait try {retry_count+1} out of {max_retries}" + ) transfer_client.task_wait( task_id, timeout=wait_timeout, polling_interval=10 ) + logger.info(f"{ts_utc()}: done with wait") except Exception as e: logger.error(f"Unexpected Exception: {e}") else: @@ -350,7 +364,7 @@ def globus_wait(task_id: str): with 20 second timeout limit. If the task is ACTIVE after time runs out 'task_wait' returns False, and True otherwise. """ - while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20): + while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20): pass """ The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). @@ -387,10 +401,24 @@ def globus_finalize(non_blocking: bool = False): global transfer_client global transfer_data global task_id + global global_variable_tarfiles_pushed last_task_id = None if transfer_data: + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:") + attribs = transfer_data.__dict__ + for item in attribs["data"]["DATA"]: + if item["DATA_TYPE"] == "transfer_item": + global_variable_tarfiles_pushed += 1 + print( + f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}", + flush=True, + ) + + # SUBMIT new transfer here + logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") try: last_task = submit_transfer_with_checks(transfer_data) last_task_id = last_task.get("task_id") diff --git a/zstash/hpss.py b/zstash/hpss.py index 44bc0f13..24603388 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -10,6 +10,9 @@ from .settings import get_db_filename, logger from .utils import run_command, ts_utc +prev_transfers: List[str] = list() +curr_transfers: List[str] = list() + def hpss_transfer( hpss: str, @@ -18,7 +21,18 @@ def hpss_transfer( cache: str, keep: bool = False, non_blocking: bool = False, + is_index: bool = False, ): + global prev_transfers + global curr_transfers + + logger.info( + f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {prev_transfers}" + ) + # logger.debug( + # f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {curr_transfers}" + # ) + if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) if transfer_type == "put" and file_path != get_db_filename(cache): @@ -71,6 +85,10 @@ def hpss_transfer( endpoint = url.netloc url_path = url.path + curr_transfers.append(file_path) + # logger.debug( + # f"{ts_utc()}: curr_transfers has been appended to, is now {curr_transfers}" + # ) path, name = os.path.split(file_path) # Need to be in local directory for `hsi` to work @@ -112,19 +130,33 @@ def hpss_transfer( if transfer_type == "put": if not keep: - if (scheme != "globus") or ( - globus_status == "SUCCEEDED" and not non_blocking - ): - os.remove(file_path) + if (scheme != "globus") or (globus_status == "SUCCEEDED"): + # Note: This is intended to fulfill the default removal of successfully-transfered + # tar files when keep=False, irrespective of non-blocking status + logger.debug( + f"{ts_utc()}: deleting transfered files {prev_transfers}" + ) + for src_path in prev_transfers: + os.remove(src_path) + prev_transfers = curr_transfers + curr_transfers = list() + logger.info( + f"{ts_utc()}: prev_transfers has been set to {prev_transfers}" + ) def hpss_put( - hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False + hpss: str, + file_path: str, + cache: str, + keep: bool = True, + non_blocking: bool = False, + is_index=False, ): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking) + hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking, is_index) def hpss_get(hpss: str, file_path: str, cache: str): diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 707f73a8..87325f4f 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -165,7 +165,7 @@ def add_files( # print(process.stdout) logger.info( - f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}" + f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname} [keep, non_blocking] = [{keep}, {non_blocking}]" ) hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) logger.info( diff --git a/zstash/update.py b/zstash/update.py index 56095897..9ab565b8 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -38,12 +38,12 @@ def update(): else: failures = result - # Transfer to HPSS. Always keep a local copy. + # Transfer to HPSS. Always keep a local copy of the database. if config.hpss is not None: hpss = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, get_db_filename(cache), cache, keep=True) + hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) globus_finalize(non_blocking=args.non_blocking) @@ -83,6 +83,13 @@ def setup_update() -> Tuple[argparse.Namespace, str]: help="dry run, only list files to be updated in archive", action="store_true", ) + optional.add_argument( + "--maxsize", + type=float, + help="maximum size of tar archives (in GB, default 256)", + default=256, + ) + optional.add_argument( "--keep", help='if --hpss is not "none", keep the tar files in the local archive (cache) after uploading to the HPSS archive. Default is to delete the tar files. If --hpss=none, this flag has no effect.', @@ -107,8 +114,16 @@ def setup_update() -> Tuple[argparse.Namespace, str]: help="Hard copy symlinks. This is useful for preventing broken links. Note that a broken link will result in a failed update.", ) args: argparse.Namespace = parser.parse_args(sys.argv[2:]) - if args.hpss and args.hpss.lower() == "none": + + if (not args.hpss) or (args.hpss.lower() == "none"): args.hpss = "none" + args.keep = True + + # Copy configuration + # config.path = os.path.abspath(args.path) + config.hpss = args.hpss + config.maxsize = int(1024 * 1024 * 1024 * args.maxsize) + cache: str if args.cache: cache = args.cache @@ -242,14 +257,28 @@ def update_database( # noqa: C901 try: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks + cur, + con, + itar, + newfiles, + cache, + keep, + args.follow_symlinks, + non_blocking=args.non_blocking, ) except FileNotFoundError: raise Exception("Archive update failed due to broken symlink.") else: # Add files failures = add_files( - cur, con, itar, newfiles, cache, keep, args.follow_symlinks + cur, + con, + itar, + newfiles, + cache, + keep, + args.follow_symlinks, + non_blocking=args.non_blocking, ) # Close database