mirror of
https://github.com/taoky/shadowmire.git
synced 2025-07-08 09:12:43 +00:00
Use click to replace argparse
Make it simpler and easier to maintain command line arguments.
This commit is contained in:
parent
8d2460cf73
commit
c695419700
@ -1,2 +1,3 @@
|
||||
requests==2.32.3
|
||||
tqdm==4.66.4
|
||||
click==8.1.7
|
||||
|
253
shadowmire.py
253
shadowmire.py
@ -10,18 +10,19 @@ from pathlib import Path
|
||||
from html.parser import HTMLParser
|
||||
import logging
|
||||
import html
|
||||
import argparse
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
import sqlite3
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import signal
|
||||
import requests
|
||||
import click
|
||||
from tqdm import tqdm
|
||||
from requests.adapters import HTTPAdapter, Retry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
USER_AGENT = "Shadowmire (https://github.com/taoky/shadowmire)"
|
||||
|
||||
# Note that it's suggested to use only 3 workers for PyPI.
|
||||
@ -32,6 +33,21 @@ if WORKERS > 10:
|
||||
)
|
||||
logger.warning("Don't blame me if you were banned!")
|
||||
|
||||
# https://github.com/pypa/bandersnatch/blob/a05af547f8d1958217ef0dc0028890b1839e6116/src/bandersnatch_filter_plugins/prerelease_name.py#L18C1-L23C6
|
||||
PRERELEASE_PATTERNS = (
|
||||
re.compile(r".+rc\d+$"),
|
||||
re.compile(r".+a(lpha)?\d+$"),
|
||||
re.compile(r".+b(eta)?\d+$"),
|
||||
re.compile(r".+dev\d+$"),
|
||||
)
|
||||
|
||||
|
||||
def is_version_prerelease(version: str) -> bool:
|
||||
for p in PRERELEASE_PATTERNS:
|
||||
if p.match(version):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class PackageNotFoundError(Exception):
|
||||
pass
|
||||
@ -195,8 +211,8 @@ class CustomXMLRPCTransport(xmlrpc.client.Transport):
|
||||
def create_requests_session() -> requests.Session:
|
||||
s = requests.Session()
|
||||
retries = Retry(total=3, backoff_factor=0.1)
|
||||
s.mount('http://', HTTPAdapter(max_retries=retries))
|
||||
s.mount('https://', HTTPAdapter(max_retries=retries))
|
||||
s.mount("http://", HTTPAdapter(max_retries=retries))
|
||||
s.mount("https://", HTTPAdapter(max_retries=retries))
|
||||
s.headers.update({"User-Agent": USER_AGENT})
|
||||
return s
|
||||
|
||||
@ -639,120 +655,137 @@ def get_local_serial(package_simple_path: Path) -> Optional[int]:
|
||||
return None
|
||||
|
||||
|
||||
def main(args: argparse.Namespace) -> None:
|
||||
def sync_shared_args(func):
|
||||
shared_options = [
|
||||
click.option(
|
||||
"--sync-packages",
|
||||
is_flag=True,
|
||||
help="Sync packages instead of just indexes",
|
||||
),
|
||||
click.option(
|
||||
"--shadowmire-upstream",
|
||||
required=False,
|
||||
type=str,
|
||||
help="Use another upstream using shadowmire instead of PyPI",
|
||||
),
|
||||
click.option(
|
||||
"--exclude", multiple=True, help="Remote package names to exclude. Regex."
|
||||
),
|
||||
]
|
||||
for option in shared_options[::-1]:
|
||||
func = option(func)
|
||||
return func
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context) -> None:
|
||||
log_level = logging.DEBUG if os.environ.get("DEBUG") else logging.INFO
|
||||
logging.basicConfig(level=log_level)
|
||||
logger.debug(args)
|
||||
ctx.ensure_object(dict)
|
||||
|
||||
basedir = Path(os.environ.get("REPO", "."))
|
||||
local_db = LocalVersionKV(basedir / "local.db", basedir / "local.json")
|
||||
|
||||
sync: SyncBase
|
||||
if args.command == "sync":
|
||||
if args.shadowmire_upstream:
|
||||
sync = SyncPlainHTTP(
|
||||
upstream=args.shadowmire_upstream,
|
||||
basedir=basedir,
|
||||
local_db=local_db,
|
||||
sync_packages=args.sync_packages,
|
||||
)
|
||||
else:
|
||||
sync = SyncPyPI(
|
||||
basedir=basedir, local_db=local_db, sync_packages=args.sync_packages
|
||||
)
|
||||
local = local_db.dump()
|
||||
plan = sync.determine_sync_plan(local, args.excludes)
|
||||
# save plan for debugging
|
||||
with overwrite(basedir / "plan.json") as f:
|
||||
json.dump(plan, f, default=vars)
|
||||
sync.do_sync_plan(plan)
|
||||
sync.finalize()
|
||||
elif args.command == "genlocal":
|
||||
local = {}
|
||||
for package_path in (basedir / "simple").iterdir():
|
||||
package_name = package_path.name
|
||||
serial = get_local_serial(package_path)
|
||||
if serial:
|
||||
local[package_name] = serial
|
||||
local_db.nuke(commit=False)
|
||||
local_db.batch_set(local)
|
||||
local_db.dump_json()
|
||||
elif args.command == "verify":
|
||||
if args.shadowmire_upstream:
|
||||
sync = SyncPlainHTTP(
|
||||
upstream=args.shadowmire_upstream,
|
||||
basedir=basedir,
|
||||
local_db=local_db,
|
||||
sync_packages=args.sync_packages,
|
||||
)
|
||||
else:
|
||||
sync = SyncPyPI(
|
||||
basedir=basedir, local_db=local_db, sync_packages=args.sync_packages
|
||||
)
|
||||
local_names = set(local_db.keys())
|
||||
simple_dirs = set(
|
||||
[i.name for i in (basedir / "simple").iterdir() if i.is_dir()]
|
||||
ctx.obj["basedir"] = basedir
|
||||
ctx.obj["local_db"] = local_db
|
||||
|
||||
|
||||
def exclude_to_excludes(exclude: tuple[str]) -> list[re.Pattern]:
|
||||
return [re.compile(i) for i in exclude]
|
||||
|
||||
|
||||
def get_syncer(
|
||||
basedir: Path,
|
||||
local_db: LocalVersionKV,
|
||||
sync_packages: bool,
|
||||
shadowmire_upstream: Optional[str],
|
||||
) -> SyncBase:
|
||||
syncer: SyncBase
|
||||
if shadowmire_upstream:
|
||||
syncer = SyncPlainHTTP(
|
||||
upstream=shadowmire_upstream,
|
||||
basedir=basedir,
|
||||
local_db=local_db,
|
||||
sync_packages=sync_packages,
|
||||
)
|
||||
for package_name in simple_dirs - local_names:
|
||||
sync.do_remove(package_name)
|
||||
sync.parallel_update(list(local_names))
|
||||
sync.finalize()
|
||||
# clean up unreferenced package files
|
||||
ref_set = set()
|
||||
for sname in simple_dirs:
|
||||
sd = basedir / "simple" / sname
|
||||
hrefs = get_existing_hrefs(sd)
|
||||
for i in hrefs:
|
||||
ref_set.add(str((sd / i).resolve()))
|
||||
for file in (basedir / "packages").glob("*/*/*/*"):
|
||||
file = file.resolve()
|
||||
if str(file) not in ref_set:
|
||||
logger.info("removing unreferenced %s", file)
|
||||
file.unlink()
|
||||
else:
|
||||
syncer = SyncPyPI(
|
||||
basedir=basedir, local_db=local_db, sync_packages=sync_packages
|
||||
)
|
||||
return syncer
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
@sync_shared_args
|
||||
def sync(
|
||||
ctx: click.Context,
|
||||
sync_packages: bool,
|
||||
shadowmire_upstream: Optional[str],
|
||||
exclude: tuple[str],
|
||||
) -> None:
|
||||
basedir = ctx.obj["basedir"]
|
||||
local_db = ctx.obj["local_db"]
|
||||
excludes = exclude_to_excludes(exclude)
|
||||
syncer = get_syncer(basedir, local_db, sync_packages, shadowmire_upstream)
|
||||
local = local_db.dump()
|
||||
plan = syncer.determine_sync_plan(local, excludes)
|
||||
# save plan for debugging
|
||||
with overwrite(basedir / "plan.json") as f:
|
||||
json.dump(plan, f, default=vars)
|
||||
syncer.do_sync_plan(plan)
|
||||
syncer.finalize()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def genlocal(ctx: click.Context) -> None:
|
||||
basedir = ctx.obj["basedir"]
|
||||
local_db = ctx.obj["local_db"]
|
||||
local = {}
|
||||
for package_path in (basedir / "simple").iterdir():
|
||||
package_name = package_path.name
|
||||
serial = get_local_serial(package_path)
|
||||
if serial:
|
||||
local[package_name] = serial
|
||||
local_db.nuke(commit=False)
|
||||
local_db.batch_set(local)
|
||||
local_db.dump_json()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
@sync_shared_args
|
||||
def verify(
|
||||
ctx: click.Context,
|
||||
sync_packages: bool,
|
||||
shadowmire_upstream: Optional[str],
|
||||
exclude: tuple[str],
|
||||
) -> None:
|
||||
basedir = ctx.obj["basedir"]
|
||||
local_db = ctx.obj["local_db"]
|
||||
excludes = exclude_to_excludes(exclude)
|
||||
syncer = get_syncer(basedir, local_db, sync_packages, shadowmire_upstream)
|
||||
local_names = set(local_db.keys())
|
||||
simple_dirs = set([i.name for i in (basedir / "simple").iterdir() if i.is_dir()])
|
||||
for package_name in simple_dirs - local_names:
|
||||
syncer.do_remove(package_name)
|
||||
syncer.parallel_update(list(local_names))
|
||||
syncer.finalize()
|
||||
# clean up unreferenced package files
|
||||
ref_set = set()
|
||||
for sname in simple_dirs:
|
||||
sd = basedir / "simple" / sname
|
||||
hrefs = get_existing_hrefs(sd)
|
||||
for i in hrefs:
|
||||
ref_set.add(str((sd / i).resolve()))
|
||||
for file in (basedir / "packages").glob("*/*/*/*"):
|
||||
file = file.resolve()
|
||||
if str(file) not in ref_set:
|
||||
logger.info("removing unreferenced %s", file)
|
||||
file.unlink()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser("shadowmire: lightweight PyPI syncing tool")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
parser_sync = subparsers.add_parser("sync", help="Sync from upstream")
|
||||
parser_sync.add_argument(
|
||||
"--sync-packages",
|
||||
help="Sync packages instead of just indexes",
|
||||
action="store_true",
|
||||
)
|
||||
parser_sync.add_argument(
|
||||
"--exclude", help="Remote package names to exclude. Regex.", nargs="*"
|
||||
)
|
||||
parser_sync.add_argument(
|
||||
"--shadowmire-upstream",
|
||||
help="Use another upstream using shadowmire instead of PyPI",
|
||||
type=str,
|
||||
)
|
||||
parser_genlocal = subparsers.add_parser(
|
||||
"genlocal", help="(Re)generate local db and json from simple/"
|
||||
)
|
||||
parser_verify = subparsers.add_parser(
|
||||
"verify",
|
||||
help="Verify existing sync from local db, download missing things, remove unreferenced packages",
|
||||
)
|
||||
parser_verify.add_argument(
|
||||
"--sync-packages",
|
||||
help="Sync packages instead of just indexes",
|
||||
action="store_true",
|
||||
)
|
||||
parser_verify.add_argument(
|
||||
"--shadowmire-upstream",
|
||||
help="Use another upstream using shadowmire instead of PyPI",
|
||||
type=str,
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.command is None:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
if args.command == "sync" and args.exclude:
|
||||
args.excludes = [re.compile(i) for i in args.exclude]
|
||||
else:
|
||||
args.excludes = []
|
||||
main(args)
|
||||
cli(obj={})
|
||||
|
Loading…
x
Reference in New Issue
Block a user