Parallelize verification step 3 with threads

Related: #4
This commit is contained in:
taoky 2024-08-27 19:06:05 +08:00
parent d64ea61be7
commit 799a477336

View File

@ -36,6 +36,8 @@ USER_AGENT = "Shadowmire (https://github.com/taoky/shadowmire)"
# Note that it's suggested to use only 3 workers for PyPI. # Note that it's suggested to use only 3 workers for PyPI.
WORKERS = int(os.environ.get("SHADOWMIRE_WORKERS", "3")) WORKERS = int(os.environ.get("SHADOWMIRE_WORKERS", "3"))
# Use threads to parallelize verification local IO
IOWORKERS = int(os.environ.get("SHADOWMIRE_IOWORKERS", "5"))
# A safety net -- to avoid upstream issues casuing too many packages removed when determinating sync plan. # A safety net -- to avoid upstream issues casuing too many packages removed when determinating sync plan.
MAX_DELETION = int(os.environ.get("SHADOWMIRE_MAX_DELETION", "50000")) MAX_DELETION = int(os.environ.get("SHADOWMIRE_MAX_DELETION", "50000"))
@ -516,13 +518,11 @@ class SyncBase:
json_files: set[str], json_files: set[str],
compare_size: bool, compare_size: bool,
) -> bool: ) -> bool:
to_update = [] def is_consistent(package_name: str) -> bool:
for package_name in tqdm(package_names, desc="Checking consistency"):
if package_name not in json_files: if package_name not in json_files:
# save a newfstatat() when name already in json_files # save a newfstatat() when name already in json_files
logger.info("add %s as it does not have json API file", package_name) logger.info("add %s as it does not have json API file", package_name)
to_update.append(package_name) return False
continue
package_simple_path = self.simple_dir / package_name package_simple_path = self.simple_dir / package_name
html_simple = package_simple_path / "index.html" html_simple = package_simple_path / "index.html"
htmlv1_simple = package_simple_path / "index.v1_html" htmlv1_simple = package_simple_path / "index.v1_html"
@ -539,8 +539,7 @@ class SyncBase:
"add %s as it does not have index.v1_html or index.v1_json", "add %s as it does not have index.v1_html or index.v1_json",
package_name, package_name,
) )
to_update.append(package_name) return False
continue
if ( if (
hrefs_html is None hrefs_html is None
or hrefsize_json is None or hrefsize_json is None
@ -548,20 +547,17 @@ class SyncBase:
): ):
# something unexpected happens... # something unexpected happens...
logger.info("add %s as its indexes are not consistent", package_name) logger.info("add %s as its indexes are not consistent", package_name)
to_update.append(package_name) return False
continue
# OK, check if all hrefs have corresponding files # OK, check if all hrefs have corresponding files
if self.sync_packages: if self.sync_packages:
should_update = False
for href, size in hrefsize_json: for href, size in hrefsize_json:
dest = Path(normpath(package_simple_path / href)) dest = Path(normpath(package_simple_path / href))
try: try:
dest_stat = dest.stat() dest_stat = dest.stat()
except FileNotFoundError: except FileNotFoundError:
logger.info("add %s as it's missing packages", package_name) logger.info("add %s as it's missing packages", package_name)
should_update = True return False
break
if compare_size and size != -1: if compare_size and size != -1:
dest_size = dest_stat.st_size dest_size = dest_stat.st_size
if dest_size != size: if dest_size != size:
@ -571,10 +567,37 @@ class SyncBase:
dest_size, dest_size,
size, size,
) )
should_update = True return False
break return True
if should_update:
to_update.append(package_name) to_update = []
with ThreadPoolExecutor(max_workers=IOWORKERS) as executor:
futures = {
executor.submit(is_consistent, package_name): package_name
for package_name in package_names
}
try:
for future in tqdm(
as_completed(futures),
total=len(package_names),
desc="Checking consistency",
):
package_name = futures[future]
try:
consistent = future.result()
if not consistent:
to_update.append(package_name)
except Exception:
logger.warning(
"%s generated an exception", package_name, exc_info=True
)
raise
except:
logger.info("Get an exception, exiting...")
for future in futures:
future.cancel()
sys.exit(1)
logger.info("%s packages to update in check_and_update()", len(to_update)) logger.info("%s packages to update in check_and_update()", len(to_update))
return self.parallel_update(to_update, prerelease_excludes) return self.parallel_update(to_update, prerelease_excludes)