diff --git a/shadowmire.py b/shadowmire.py index 2bb13d4..b9ddd47 100755 --- a/shadowmire.py +++ b/shadowmire.py @@ -36,6 +36,8 @@ USER_AGENT = "Shadowmire (https://github.com/taoky/shadowmire)" # Note that it's suggested to use only 3 workers for PyPI. WORKERS = int(os.environ.get("SHADOWMIRE_WORKERS", "3")) +# Use threads to parallelize verification local IO +IOWORKERS = int(os.environ.get("SHADOWMIRE_IOWORKERS", "5")) # A safety net -- to avoid upstream issues casuing too many packages removed when determinating sync plan. MAX_DELETION = int(os.environ.get("SHADOWMIRE_MAX_DELETION", "50000")) @@ -516,13 +518,11 @@ class SyncBase: json_files: set[str], compare_size: bool, ) -> bool: - to_update = [] - for package_name in tqdm(package_names, desc="Checking consistency"): + def is_consistent(package_name: str) -> bool: if package_name not in json_files: # save a newfstatat() when name already in json_files logger.info("add %s as it does not have json API file", package_name) - to_update.append(package_name) - continue + return False package_simple_path = self.simple_dir / package_name html_simple = package_simple_path / "index.html" htmlv1_simple = package_simple_path / "index.v1_html" @@ -539,8 +539,7 @@ class SyncBase: "add %s as it does not have index.v1_html or index.v1_json", package_name, ) - to_update.append(package_name) - continue + return False if ( hrefs_html is None or hrefsize_json is None @@ -548,20 +547,17 @@ class SyncBase: ): # something unexpected happens... logger.info("add %s as its indexes are not consistent", package_name) - to_update.append(package_name) - continue + return False # OK, check if all hrefs have corresponding files if self.sync_packages: - should_update = False for href, size in hrefsize_json: dest = Path(normpath(package_simple_path / href)) try: dest_stat = dest.stat() except FileNotFoundError: logger.info("add %s as it's missing packages", package_name) - should_update = True - break + return False if compare_size and size != -1: dest_size = dest_stat.st_size if dest_size != size: @@ -571,10 +567,37 @@ class SyncBase: dest_size, size, ) - should_update = True - break - if should_update: - to_update.append(package_name) + return False + return True + + to_update = [] + with ThreadPoolExecutor(max_workers=IOWORKERS) as executor: + futures = { + executor.submit(is_consistent, package_name): package_name + for package_name in package_names + } + try: + for future in tqdm( + as_completed(futures), + total=len(package_names), + desc="Checking consistency", + ): + package_name = futures[future] + try: + consistent = future.result() + if not consistent: + to_update.append(package_name) + except Exception: + logger.warning( + "%s generated an exception", package_name, exc_info=True + ) + raise + except: + logger.info("Get an exception, exiting...") + for future in futures: + future.cancel() + sys.exit(1) + logger.info("%s packages to update in check_and_update()", len(to_update)) return self.parallel_update(to_update, prerelease_excludes)