Parallelize verification step 4

Related: #4
This commit is contained in:
taoky 2024-08-27 20:33:36 +08:00
parent 9e57eb6f46
commit 99fabce7b8

View File

@ -625,7 +625,7 @@ class SyncBase:
if serial: if serial:
self.local_db.set(package_name, serial) self.local_db.set(package_name, serial)
except Exception as e: except Exception as e:
if isinstance(e, (ExitProgramException, KeyboardInterrupt)): if isinstance(e, (KeyboardInterrupt)):
raise raise
logger.warning( logger.warning(
"%s generated an exception", package_name, exc_info=True "%s generated an exception", package_name, exc_info=True
@ -1204,25 +1204,81 @@ def verify(
"====== Step 4. Remove any unreferenced files in `packages` folder ======" "====== Step 4. Remove any unreferenced files in `packages` folder ======"
) )
ref_set = set() ref_set = set()
for sname in tqdm(simple_dirs, desc="Iterating simple/ directory"): with ThreadPoolExecutor(max_workers=IOWORKERS) as executor:
sd = basedir / "simple" / sname # Part 1: iterate simple/
hrefs = get_existing_hrefs(sd) def iterate_simple(sname: str) -> list[str]:
hrefs = [] if hrefs is None else hrefs sd = basedir / "simple" / sname
for i in hrefs: hrefs = get_existing_hrefs(sd)
# use normpath, which is much faster than pathlib resolve(), as it does not need to access fs hrefs = [] if hrefs is None else hrefs
# we could make sure no symlinks could affect this here nps = []
np = normpath(sd / i) for i in hrefs:
logger.debug("add to ref_set: %s", np) # use normpath, which is much faster than pathlib resolve(), as it does not need to access fs
ref_set.add(np) # we could make sure no symlinks could affect this here
for file in tqdm( np = normpath(sd / i)
(basedir / "packages").glob("*/*/*/*"), desc="Iterating packages/*/*/*/*" logger.debug("add to ref_set: %s", np)
): # ref_set.add(np)
# basedir is absolute, so file is also absolute nps.append(np)
# just convert to str to match normpath result return nps
logger.debug("find file %s", file)
if str(file) not in ref_set: futures = {
logger.info("removing unreferenced file %s", file) executor.submit(iterate_simple, sname): sname for sname in simple_dirs
file.unlink() }
try:
for future in tqdm(
as_completed(futures),
total=len(simple_dirs),
desc="Iterating simple/ directory",
):
sname = futures[future]
try:
nps = future.result()
for np in nps:
ref_set.add(np)
except Exception as e:
if isinstance(e, (KeyboardInterrupt)):
raise
logger.warning("%s generated an exception", sname, exc_info=True)
success = False
except (ExitProgramException, KeyboardInterrupt):
# TODO: dup code in threading exception handler
logger.info("Get ExitProgramException or KeyboardInterrupt, exiting...")
for future in futures:
future.cancel()
sys.exit(1)
# Part 2: iterate packages
def unlink_not_in_set(first_dirname: str, position: int) -> None:
for file in tqdm(
(basedir / "packages" / first_dirname).glob("*/*/*"),
desc=f"Iterating packages/{first_dirname}/*/*/*",
position=position,
):
logger.debug("find file %s", file)
if str(file) not in ref_set:
logger.info("removing unreferenced file %s", file)
file.unlink()
# MyPy does not enjoy same variable name with different types, even when --allow-redefinition
# Ignore here to make mypy happy
futures = {
executor.submit(unlink_not_in_set, first_dir.name, idx % IOWORKERS): first_dir.name # type: ignore
for idx, first_dir in enumerate(fast_iterdir((basedir / "packages"), "dir"))
}
try:
for future in as_completed(futures):
sname = futures[future]
try:
future.result()
except Exception as e:
if isinstance(e, (KeyboardInterrupt)):
raise
logger.warning("%s generated an exception", sname, exc_info=True)
success = False
except (ExitProgramException, KeyboardInterrupt):
logger.info("Get ExitProgramException or KeyboardInterrupt, exiting...")
for future in futures:
future.cancel()
sys.exit(1)
logger.info("Verification finished. Success: %s", success) logger.info("Verification finished. Success: %s", success)