From 99fabce7b8bbd49a9a7bd2beb5032c32639b6872 Mon Sep 17 00:00:00 2001 From: taoky Date: Tue, 27 Aug 2024 20:33:36 +0800 Subject: [PATCH] Parallelize verification step 4 Related: #4 --- shadowmire.py | 96 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 76 insertions(+), 20 deletions(-) diff --git a/shadowmire.py b/shadowmire.py index cc4c7d7..01b8ece 100755 --- a/shadowmire.py +++ b/shadowmire.py @@ -625,7 +625,7 @@ class SyncBase: if serial: self.local_db.set(package_name, serial) except Exception as e: - if isinstance(e, (ExitProgramException, KeyboardInterrupt)): + if isinstance(e, (KeyboardInterrupt)): raise logger.warning( "%s generated an exception", package_name, exc_info=True @@ -1204,25 +1204,81 @@ def verify( "====== Step 4. Remove any unreferenced files in `packages` folder ======" ) ref_set = set() - for sname in tqdm(simple_dirs, desc="Iterating simple/ directory"): - sd = basedir / "simple" / sname - hrefs = get_existing_hrefs(sd) - hrefs = [] if hrefs is None else hrefs - for i in hrefs: - # use normpath, which is much faster than pathlib resolve(), as it does not need to access fs - # we could make sure no symlinks could affect this here - np = normpath(sd / i) - logger.debug("add to ref_set: %s", np) - ref_set.add(np) - for file in tqdm( - (basedir / "packages").glob("*/*/*/*"), desc="Iterating packages/*/*/*/*" - ): - # basedir is absolute, so file is also absolute - # just convert to str to match normpath result - logger.debug("find file %s", file) - if str(file) not in ref_set: - logger.info("removing unreferenced file %s", file) - file.unlink() + with ThreadPoolExecutor(max_workers=IOWORKERS) as executor: + # Part 1: iterate simple/ + def iterate_simple(sname: str) -> list[str]: + sd = basedir / "simple" / sname + hrefs = get_existing_hrefs(sd) + hrefs = [] if hrefs is None else hrefs + nps = [] + for i in hrefs: + # use normpath, which is much faster than pathlib resolve(), as it does not need to access fs + # we could make sure no symlinks could affect this here + np = normpath(sd / i) + logger.debug("add to ref_set: %s", np) + # ref_set.add(np) + nps.append(np) + return nps + + futures = { + executor.submit(iterate_simple, sname): sname for sname in simple_dirs + } + try: + for future in tqdm( + as_completed(futures), + total=len(simple_dirs), + desc="Iterating simple/ directory", + ): + sname = futures[future] + try: + nps = future.result() + for np in nps: + ref_set.add(np) + except Exception as e: + if isinstance(e, (KeyboardInterrupt)): + raise + logger.warning("%s generated an exception", sname, exc_info=True) + success = False + except (ExitProgramException, KeyboardInterrupt): + # TODO: dup code in threading exception handler + logger.info("Get ExitProgramException or KeyboardInterrupt, exiting...") + for future in futures: + future.cancel() + sys.exit(1) + + # Part 2: iterate packages + def unlink_not_in_set(first_dirname: str, position: int) -> None: + for file in tqdm( + (basedir / "packages" / first_dirname).glob("*/*/*"), + desc=f"Iterating packages/{first_dirname}/*/*/*", + position=position, + ): + logger.debug("find file %s", file) + if str(file) not in ref_set: + logger.info("removing unreferenced file %s", file) + file.unlink() + + # MyPy does not enjoy same variable name with different types, even when --allow-redefinition + # Ignore here to make mypy happy + futures = { + executor.submit(unlink_not_in_set, first_dir.name, idx % IOWORKERS): first_dir.name # type: ignore + for idx, first_dir in enumerate(fast_iterdir((basedir / "packages"), "dir")) + } + try: + for future in as_completed(futures): + sname = futures[future] + try: + future.result() + except Exception as e: + if isinstance(e, (KeyboardInterrupt)): + raise + logger.warning("%s generated an exception", sname, exc_info=True) + success = False + except (ExitProgramException, KeyboardInterrupt): + logger.info("Get ExitProgramException or KeyboardInterrupt, exiting...") + for future in futures: + future.cancel() + sys.exit(1) logger.info("Verification finished. Success: %s", success)