Run formatter on some Python scripts

Signed-off-by: Harry Chen <i@harrychen.xyz>
This commit is contained in:
Harry Chen 2024-08-17 13:33:31 +08:00
parent f8afa1f57c
commit 5f4bc1c260
No known key found for this signature in database
6 changed files with 449 additions and 258 deletions

View File

@ -1,17 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import hashlib import hashlib
import traceback
import json
import os import os
import re
import shutil
import subprocess as sp import subprocess as sp
import tempfile
import argparse
import time import time
from email.utils import parsedate_to_datetime from email.utils import parsedate_to_datetime
from pathlib import Path from pathlib import Path
from typing import List, Set, Tuple, IO from typing import Set
import requests import requests
DOWNLOAD_TIMEOUT = int(os.getenv('DOWNLOAD_TIMEOUT', '1800')) DOWNLOAD_TIMEOUT = int(os.getenv('DOWNLOAD_TIMEOUT', '1800'))

View File

@ -25,6 +25,7 @@ CONDA_CLOUD_BASE_URL = os.getenv("CONDA_COULD_URL", "https://conda.anaconda.org"
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR") WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR")
# fmt: off
CONDA_REPOS = ("main", "free", "r", "msys2") CONDA_REPOS = ("main", "free", "r", "msys2")
CONDA_ARCHES = ( CONDA_ARCHES = (
"noarch", "linux-64", "linux-32", "linux-aarch64", "linux-armv6l", "linux-armv7l", "noarch", "linux-64", "linux-32", "linux-aarch64", "linux-armv6l", "linux-armv7l",
@ -72,6 +73,7 @@ CONDA_CLOUD_REPOS = (
EXCLUDED_PACKAGES = ( EXCLUDED_PACKAGES = (
"pytorch-nightly", "pytorch-nightly-cpu", "ignite-nightly", "pytorch-nightly", "pytorch-nightly-cpu", "ignite-nightly",
) )
# fmt: on
# connect and read timeout value # connect and read timeout value
TIMEOUT_OPTION = (7, 10) TIMEOUT_OPTION = (7, 10)
@ -84,16 +86,18 @@ logging.basicConfig(
format="[%(asctime)s] [%(levelname)s] %(message)s", format="[%(asctime)s] [%(levelname)s] %(message)s",
) )
def sizeof_fmt(num, suffix='iB'):
for unit in ['','K','M','G','T','P','E','Z']: def sizeof_fmt(num, suffix="iB"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0: if abs(num) < 1024.0:
return "%3.2f%s%s" % (num, unit, suffix) return "%3.2f%s%s" % (num, unit, suffix)
num /= 1024.0 num /= 1024.0
return "%.2f%s%s" % (num, 'Y', suffix) return "%.2f%s%s" % (num, "Y", suffix)
def md5_check(file: Path, md5: str = None): def md5_check(file: Path, md5: str = None):
m = hashlib.md5() m = hashlib.md5()
with file.open('rb') as f: with file.open("rb") as f:
while True: while True:
buf = f.read(1 * 1024 * 1024) buf = f.read(1 * 1024 * 1024)
if not buf: if not buf:
@ -101,9 +105,10 @@ def md5_check(file: Path, md5: str = None):
m.update(buf) m.update(buf)
return m.hexdigest() == md5 return m.hexdigest() == md5
def sha256_check(file: Path, sha256: str = None): def sha256_check(file: Path, sha256: str = None):
m = hashlib.sha256() m = hashlib.sha256()
with file.open('rb') as f: with file.open("rb") as f:
while True: while True:
buf = f.read(1 * 1024 * 1024) buf = f.read(1 * 1024 * 1024)
if not buf: if not buf:
@ -113,34 +118,42 @@ def sha256_check(file: Path, sha256: str = None):
def curl_download(remote_url: str, dst_file: Path, sha256: str = None, md5: str = None): def curl_download(remote_url: str, dst_file: Path, sha256: str = None, md5: str = None):
sp.check_call([ # fmt: off
sp.check_call(
[
"curl", "-o", str(dst_file), "curl", "-o", str(dst_file),
"-sL", "--remote-time", "--show-error", "-sL", "--remote-time", "--show-error",
"--fail", "--retry", "10", "--speed-time", "15", "--fail", "--retry", "10",
"--speed-limit", "5000", remote_url, "--speed-time", "15",
]) "--speed-limit", "5000",
remote_url,
]
)
# fmt: on
if sha256 and (not sha256_check(dst_file, sha256)): if sha256 and (not sha256_check(dst_file, sha256)):
return "SHA256 mismatch" return "SHA256 mismatch"
if md5 and (not md5_check(dst_file, md5)): if md5 and (not md5_check(dst_file, md5)):
return "MD5 mismatch" return "MD5 mismatch"
def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove_legacy: bool): def sync_repo(
repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove_legacy: bool
):
logging.info("Start syncing {}".format(repo_url)) logging.info("Start syncing {}".format(repo_url))
local_dir.mkdir(parents=True, exist_ok=True) local_dir.mkdir(parents=True, exist_ok=True)
repodata_url = repo_url + '/repodata.json' repodata_url = repo_url + "/repodata.json"
bz2_repodata_url = repo_url + '/repodata.json.bz2' bz2_repodata_url = repo_url + "/repodata.json.bz2"
# https://github.com/conda/conda/issues/13256, from conda 24.1.x # https://github.com/conda/conda/issues/13256, from conda 24.1.x
zst_repodata_url = repo_url + '/repodata.json.zst' zst_repodata_url = repo_url + "/repodata.json.zst"
# https://docs.conda.io/projects/conda-build/en/latest/release-notes.html # https://docs.conda.io/projects/conda-build/en/latest/release-notes.html
# "current_repodata.json" - like repodata.json, but only has the newest version of each file # "current_repodata.json" - like repodata.json, but only has the newest version of each file
current_repodata_url = repo_url + '/current_repodata.json' current_repodata_url = repo_url + "/current_repodata.json"
tmp_repodata = tmpdir / "repodata.json" tmp_repodata = tmpdir / "repodata.json"
tmp_bz2_repodata = tmpdir / "repodata.json.bz2" tmp_bz2_repodata = tmpdir / "repodata.json.bz2"
tmp_zst_repodata = tmpdir / "repodata.json.zst" tmp_zst_repodata = tmpdir / "repodata.json.zst"
tmp_current_repodata = tmpdir / 'current_repodata.json' tmp_current_repodata = tmpdir / "current_repodata.json"
curl_download(repodata_url, tmp_repodata) curl_download(repodata_url, tmp_repodata)
curl_download(bz2_repodata_url, tmp_bz2_repodata) curl_download(bz2_repodata_url, tmp_bz2_repodata)
@ -158,31 +171,33 @@ def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove
remote_filelist = [] remote_filelist = []
total_size = 0 total_size = 0
legacy_packages = repodata['packages'] legacy_packages = repodata["packages"]
conda_packages = repodata.get("packages.conda", {}) conda_packages = repodata.get("packages.conda", {})
if remove_legacy: if remove_legacy:
# https://github.com/anaconda/conda/blob/0dbf85e0546e0b0dc060c8265ec936591ccbe980/conda/core/subdir_data.py#L440-L442 # https://github.com/anaconda/conda/blob/0dbf85e0546e0b0dc060c8265ec936591ccbe980/conda/core/subdir_data.py#L440-L442
use_legacy_packages = set(legacy_packages.keys()) - set(k[:-6] + ".tar.bz2" for k in conda_packages.keys()) use_legacy_packages = set(legacy_packages.keys()) - set(
k[:-6] + ".tar.bz2" for k in conda_packages.keys()
)
legacy_packages = {k: legacy_packages[k] for k in use_legacy_packages} legacy_packages = {k: legacy_packages[k] for k in use_legacy_packages}
packages = {**legacy_packages, **conda_packages} packages = {**legacy_packages, **conda_packages}
for filename, meta in packages.items(): for filename, meta in packages.items():
if meta['name'] in EXCLUDED_PACKAGES: if meta["name"] in EXCLUDED_PACKAGES:
continue continue
file_size = meta['size'] file_size = meta["size"]
# prefer sha256 over md5 # prefer sha256 over md5
sha256 = None sha256 = None
md5 = None md5 = None
if 'sha256' in meta: if "sha256" in meta:
sha256 = meta['sha256'] sha256 = meta["sha256"]
elif 'md5' in meta: elif "md5" in meta:
md5 = meta['md5'] md5 = meta["md5"]
total_size += file_size total_size += file_size
pkg_url = '/'.join([repo_url, filename]) pkg_url = "/".join([repo_url, filename])
dst_file = local_dir / filename dst_file = local_dir / filename
dst_file_wip = local_dir / ('.downloading.' + filename) dst_file_wip = local_dir / (".downloading." + filename)
remote_filelist.append(dst_file) remote_filelist.append(dst_file)
if dst_file.is_file(): if dst_file.is_file():
@ -202,7 +217,7 @@ def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove
if err is None: if err is None:
dst_file_wip.rename(dst_file) dst_file_wip.rename(dst_file)
except sp.CalledProcessError: except sp.CalledProcessError:
err = 'CalledProcessError' err = "CalledProcessError"
if err is None: if err is None:
break break
logging.error("Failed to download {}: {}".format(filename, err)) logging.error("Failed to download {}: {}".format(filename, err))
@ -223,11 +238,15 @@ def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove
tmp_current_repodata_gz_gened = False tmp_current_repodata_gz_gened = False
if tmp_current_repodata.is_file(): if tmp_current_repodata.is_file():
if os.path.getsize(tmp_current_repodata) > GEN_METADATA_JSON_GZIP_THRESHOLD: if os.path.getsize(tmp_current_repodata) > GEN_METADATA_JSON_GZIP_THRESHOLD:
sp.check_call(["gzip", "--no-name", "--keep", "--", str(tmp_current_repodata)]) sp.check_call(
shutil.move(str(tmp_current_repodata) + ".gz", str(local_dir / "current_repodata.json.gz")) ["gzip", "--no-name", "--keep", "--", str(tmp_current_repodata)]
)
shutil.move(
str(tmp_current_repodata) + ".gz",
str(local_dir / "current_repodata.json.gz"),
)
tmp_current_repodata_gz_gened = True tmp_current_repodata_gz_gened = True
shutil.move(str(tmp_current_repodata), str( shutil.move(str(tmp_current_repodata), str(local_dir / "current_repodata.json"))
local_dir / "current_repodata.json"))
if not tmp_current_repodata_gz_gened: if not tmp_current_repodata_gz_gened:
# If the gzip file is not generated, remove the dangling gzip archive # If the gzip file is not generated, remove the dangling gzip archive
Path(local_dir / "current_repodata.json.gz").unlink(missing_ok=True) Path(local_dir / "current_repodata.json.gz").unlink(missing_ok=True)
@ -235,9 +254,9 @@ def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove
if delete: if delete:
local_filelist = [] local_filelist = []
delete_count = 0 delete_count = 0
for i in local_dir.glob('*.tar.bz2'): for i in local_dir.glob("*.tar.bz2"):
local_filelist.append(i) local_filelist.append(i)
for i in local_dir.glob('*.conda'): for i in local_dir.glob("*.conda"):
local_filelist.append(i) local_filelist.append(i)
for i in set(local_filelist) - set(remote_filelist): for i in set(local_filelist) - set(remote_filelist):
logging.info("Deleting {}".format(i)) logging.info("Deleting {}".format(i))
@ -245,10 +264,14 @@ def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool, remove
delete_count += 1 delete_count += 1
logging.info("{} files deleted".format(delete_count)) logging.info("{} files deleted".format(delete_count))
logging.info("{}: {} files, {} in total".format( logging.info(
repodata_url, len(remote_filelist), sizeof_fmt(total_size))) "{}: {} files, {} in total".format(
repodata_url, len(remote_filelist), sizeof_fmt(total_size)
)
)
return total_size return total_size
def sync_installer(repo_url, local_dir: Path): def sync_installer(repo_url, local_dir: Path):
logging.info("Start syncing {}".format(repo_url)) logging.info("Start syncing {}".format(repo_url))
local_dir.mkdir(parents=True, exist_ok=True) local_dir.mkdir(parents=True, exist_ok=True)
@ -257,34 +280,37 @@ def sync_installer(repo_url, local_dir: Path):
def remote_list(): def remote_list():
r = requests.get(repo_url, timeout=TIMEOUT_OPTION) r = requests.get(repo_url, timeout=TIMEOUT_OPTION)
d = pq(r.content) d = pq(r.content)
for tr in d('table').find('tr'): for tr in d("table").find("tr"):
tds = pq(tr).find('td') tds = pq(tr).find("td")
if len(tds) != 4: if len(tds) != 4:
continue continue
fname = tds[0].find('a').text fname = tds[0].find("a").text
sha256 = tds[3].text sha256 = tds[3].text
if sha256 == '<directory>' or len(sha256) != 64: if sha256 == "<directory>" or len(sha256) != 64:
continue continue
yield (fname, sha256) yield (fname, sha256)
for filename, sha256 in remote_list(): for filename, sha256 in remote_list():
pkg_url = "/".join([repo_url, filename]) pkg_url = "/".join([repo_url, filename])
dst_file = local_dir / filename dst_file = local_dir / filename
dst_file_wip = local_dir / ('.downloading.' + filename) dst_file_wip = local_dir / (".downloading." + filename)
if dst_file.is_file(): if dst_file.is_file():
r = requests.head(pkg_url, allow_redirects=True, timeout=TIMEOUT_OPTION) r = requests.head(pkg_url, allow_redirects=True, timeout=TIMEOUT_OPTION)
len_avail = 'content-length' in r.headers len_avail = "content-length" in r.headers
if len_avail: if len_avail:
remote_filesize = int(r.headers['content-length']) remote_filesize = int(r.headers["content-length"])
remote_date = parsedate_to_datetime(r.headers['last-modified']) remote_date = parsedate_to_datetime(r.headers["last-modified"])
stat = dst_file.stat() stat = dst_file.stat()
local_filesize = stat.st_size local_filesize = stat.st_size
local_mtime = stat.st_mtime local_mtime = stat.st_mtime
# Do content verification on ~5% of files (see issue #25) # Do content verification on ~5% of files (see issue #25)
if (not len_avail or remote_filesize == local_filesize) and remote_date.timestamp() == local_mtime and \ if (
(random.random() < 0.95 or sha256_check(dst_file, sha256)): (not len_avail or remote_filesize == local_filesize)
and remote_date.timestamp() == local_mtime
and (random.random() < 0.95 or sha256_check(dst_file, sha256))
):
logging.info("Skipping {}".format(filename)) logging.info("Skipping {}".format(filename))
# Stop the scanning if the most recent version is present # Stop the scanning if the most recent version is present
@ -299,25 +325,31 @@ def sync_installer(repo_url, local_dir: Path):
for retry in range(3): for retry in range(3):
logging.info("Downloading {}".format(filename)) logging.info("Downloading {}".format(filename))
err = '' err = ""
try: try:
err = curl_download(pkg_url, dst_file_wip, sha256=sha256) err = curl_download(pkg_url, dst_file_wip, sha256=sha256)
if err is None: if err is None:
dst_file_wip.rename(dst_file) dst_file_wip.rename(dst_file)
except sp.CalledProcessError: except sp.CalledProcessError:
err = 'CalledProcessError' err = "CalledProcessError"
if err is None: if err is None:
break break
logging.error("Failed to download {}: {}".format(filename, err)) logging.error("Failed to download {}: {}".format(filename, err))
def main(): def main():
import argparse import argparse
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--working-dir", default=WORKING_DIR) parser.add_argument("--working-dir", default=WORKING_DIR)
parser.add_argument("--delete", action='store_true', parser.add_argument(
help='delete unreferenced package files') "--delete", action="store_true", help="delete unreferenced package files"
parser.add_argument("--remove-legacy", action='store_true', )
help='delete legacy packages which have conda counterpart. Requires client conda >= 4.7.0') parser.add_argument(
"--remove-legacy",
action="store_true",
help="delete legacy packages which have conda counterpart. Requires client conda >= 4.7.0",
)
args = parser.parse_args() args = parser.parse_args()
if args.working_dir is None: if args.working_dir is None:
@ -336,7 +368,8 @@ def main():
try: try:
sync_installer(remote_url, local_dir) sync_installer(remote_url, local_dir)
size_statistics += sum( size_statistics += sum(
f.stat().st_size for f in local_dir.glob('*') if f.is_file()) f.stat().st_size for f in local_dir.glob("*") if f.is_file()
)
except Exception: except Exception:
logging.exception("Failed to sync installers of {}".format(dist)) logging.exception("Failed to sync installers of {}".format(dist))
success = False success = False
@ -348,8 +381,9 @@ def main():
tmpdir = tempfile.mkdtemp() tmpdir = tempfile.mkdtemp()
try: try:
size_statistics += sync_repo(remote_url, size_statistics += sync_repo(
local_dir, Path(tmpdir), args.delete, args.remove_legacy) remote_url, local_dir, Path(tmpdir), args.delete, args.remove_legacy
)
except Exception: except Exception:
logging.exception("Failed to sync repo: {}/{}".format(repo, arch)) logging.exception("Failed to sync repo: {}/{}".format(repo, arch))
success = False success = False
@ -362,8 +396,9 @@ def main():
tmpdir = tempfile.mkdtemp() tmpdir = tempfile.mkdtemp()
try: try:
size_statistics += sync_repo(remote_url, size_statistics += sync_repo(
local_dir, Path(tmpdir), args.delete, args.remove_legacy) remote_url, local_dir, Path(tmpdir), args.delete, args.remove_legacy
)
except Exception: except Exception:
logging.exception("Failed to sync repo: {}".format(repo)) logging.exception("Failed to sync repo: {}".format(repo))
success = False success = False
@ -374,6 +409,7 @@ def main():
if not success: if not success:
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -4,7 +4,6 @@ import traceback
import os import os
import re import re
import shutil import shutil
import subprocess as sp
import argparse import argparse
import bz2 import bz2
import gzip import gzip
@ -23,21 +22,27 @@ requests.utils.default_user_agent = lambda: APT_SYNC_USER_AGENT
# set preferred address family # set preferred address family
import requests.packages.urllib3.util.connection as urllib3_cn import requests.packages.urllib3.util.connection as urllib3_cn
USE_ADDR_FAMILY = os.getenv('USE_ADDR_FAMILY', '').strip().lower()
if USE_ADDR_FAMILY != '': USE_ADDR_FAMILY = os.getenv("USE_ADDR_FAMILY", "").strip().lower()
assert USE_ADDR_FAMILY in ['ipv4', 'ipv6'], "USE_ADDR_FAMILY must be either ipv4 or ipv6" if USE_ADDR_FAMILY != "":
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET if USE_ADDR_FAMILY == 'ipv4' else socket.AF_INET6 assert USE_ADDR_FAMILY in [
"ipv4",
"ipv6",
], "USE_ADDR_FAMILY must be either ipv4 or ipv6"
urllib3_cn.allowed_gai_family = lambda: (
socket.AF_INET if USE_ADDR_FAMILY == "ipv4" else socket.AF_INET6
)
OS_TEMPLATE = { OS_TEMPLATE = {
'ubuntu-lts': ["focal", "jammy", "noble"], "ubuntu-lts": ["focal", "jammy", "noble"],
'debian-current': ["bullseye", "bookworm"], "debian-current": ["bullseye", "bookworm"],
'debian-latest2': ["bullseye", "bookworm"], "debian-latest2": ["bullseye", "bookworm"],
'debian-latest': ["bookworm"], "debian-latest": ["bookworm"],
} }
ARCH_NO_PKGIDX = ['dep11', 'i18n', 'cnf'] ARCH_NO_PKGIDX = ["dep11", "i18n", "cnf"]
MAX_RETRY=int(os.getenv('MAX_RETRY', '3')) MAX_RETRY = int(os.getenv("MAX_RETRY", "3"))
DOWNLOAD_TIMEOUT=int(os.getenv('DOWNLOAD_TIMEOUT', '1800')) DOWNLOAD_TIMEOUT = int(os.getenv("DOWNLOAD_TIMEOUT", "1800"))
REPO_SIZE_FILE = os.getenv('REPO_SIZE_FILE', '') REPO_SIZE_FILE = os.getenv("REPO_SIZE_FILE", "")
pattern_os_template = re.compile(r"@\{(.+)\}") pattern_os_template = re.compile(r"@\{(.+)\}")
pattern_package_name = re.compile(r"^Filename: (.+)$", re.MULTILINE) pattern_package_name = re.compile(r"^Filename: (.+)$", re.MULTILINE)
@ -45,11 +50,13 @@ pattern_package_size = re.compile(r"^Size: (\d+)$", re.MULTILINE)
pattern_package_sha256 = re.compile(r"^SHA256: (\w{64})$", re.MULTILINE) pattern_package_sha256 = re.compile(r"^SHA256: (\w{64})$", re.MULTILINE)
download_cache = dict() download_cache = dict()
def check_args(prop: str, lst: List[str]): def check_args(prop: str, lst: List[str]):
for s in lst: for s in lst:
if len(s)==0 or ' ' in s: if len(s) == 0 or " " in s:
raise ValueError(f"Invalid item in {prop}: {repr(s)}") raise ValueError(f"Invalid item in {prop}: {repr(s)}")
def replace_os_template(os_list: List[str]) -> List[str]: def replace_os_template(os_list: List[str]) -> List[str]:
ret = [] ret = []
for i in os_list: for i in os_list:
@ -57,46 +64,54 @@ def replace_os_template(os_list: List[str]) -> List[str]:
if matched: if matched:
for os in OS_TEMPLATE[matched.group(1)]: for os in OS_TEMPLATE[matched.group(1)]:
ret.append(pattern_os_template.sub(os, i)) ret.append(pattern_os_template.sub(os, i))
elif i.startswith('@'): elif i.startswith("@"):
ret.extend(OS_TEMPLATE[i[1:]]) ret.extend(OS_TEMPLATE[i[1:]])
else: else:
ret.append(i) ret.append(i)
return ret return ret
def check_and_download(url: str, dst_file: Path, caching=False) -> int: def check_and_download(url: str, dst_file: Path, caching=False) -> int:
try: try:
if caching: if caching:
if url in download_cache: if url in download_cache:
print(f"Using cached content: {url}", flush=True) print(f"Using cached content: {url}", flush=True)
with dst_file.open('wb') as f: with dst_file.open("wb") as f:
f.write(download_cache[url]) f.write(download_cache[url])
return 0 return 0
download_cache[url] = bytes() download_cache[url] = bytes()
start = time.time() start = time.time()
with requests.get(url, stream=True, timeout=(5, 10)) as r: with requests.get(url, stream=True, timeout=(5, 10)) as r:
r.raise_for_status() r.raise_for_status()
if 'last-modified' in r.headers: if "last-modified" in r.headers:
remote_ts = parsedate_to_datetime( remote_ts = parsedate_to_datetime(
r.headers['last-modified']).timestamp() r.headers["last-modified"]
else: remote_ts = None ).timestamp()
else:
remote_ts = None
with dst_file.open('wb') as f: with dst_file.open("wb") as f:
for chunk in r.iter_content(chunk_size=1024**2): for chunk in r.iter_content(chunk_size=1024**2):
if time.time() - start > DOWNLOAD_TIMEOUT: if time.time() - start > DOWNLOAD_TIMEOUT:
raise TimeoutError("Download timeout") raise TimeoutError("Download timeout")
if not chunk: continue # filter out keep-alive new chunks if not chunk:
continue # filter out keep-alive new chunks
f.write(chunk) f.write(chunk)
if caching: download_cache[url] += chunk if caching:
download_cache[url] += chunk
if remote_ts is not None: if remote_ts is not None:
os.utime(dst_file, (remote_ts, remote_ts)) os.utime(dst_file, (remote_ts, remote_ts))
return 0 return 0
except BaseException as e: except BaseException as e:
print(e, flush=True) print(e, flush=True)
if dst_file.is_file(): dst_file.unlink() if dst_file.is_file():
if url in download_cache: del download_cache[url] dst_file.unlink()
if url in download_cache:
del download_cache[url]
return 1 return 1
def mkdir_with_dot_tmp(folder: Path) -> Tuple[Path, Path]: def mkdir_with_dot_tmp(folder: Path) -> Tuple[Path, Path]:
tmpdir = folder / ".tmp" tmpdir = folder / ".tmp"
if tmpdir.is_dir(): if tmpdir.is_dir():
@ -104,9 +119,10 @@ def mkdir_with_dot_tmp(folder: Path)->Tuple[Path, Path]:
tmpdir.mkdir(parents=True, exist_ok=True) tmpdir.mkdir(parents=True, exist_ok=True)
return (folder, tmpdir) return (folder, tmpdir)
def move_files_in(src: Path, dst: Path): def move_files_in(src: Path, dst: Path):
empty = True empty = True
for file in src.glob('*'): for file in src.glob("*"):
empty = False empty = False
print(f"moving {file} to {dst}") print(f"moving {file} to {dst}")
# shutil.move(str(file), str(dst)) # shutil.move(str(file), str(dst))
@ -119,7 +135,15 @@ def move_files_in(src: Path, dst: Path):
if empty: if empty:
print(f"{src} is empty") print(f"{src} is empty")
def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Path, deb_set: Dict[str, int])->int:
def apt_mirror(
base_url: str,
dist: str,
repo: str,
arch: str,
dest_base_dir: Path,
deb_set: Dict[str, int],
) -> int:
if not dest_base_dir.is_dir(): if not dest_base_dir.is_dir():
print("Destination directory is empty, cannot continue") print("Destination directory is empty, cannot continue")
return 1 return 1
@ -127,14 +151,27 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
# download Release files # download Release files
dist_dir, dist_tmp_dir = mkdir_with_dot_tmp(dest_base_dir / "dists" / dist) dist_dir, dist_tmp_dir = mkdir_with_dot_tmp(dest_base_dir / "dists" / dist)
check_and_download(f"{base_url}/dists/{dist}/InRelease",dist_tmp_dir / "InRelease", caching=True) check_and_download(
if check_and_download(f"{base_url}/dists/{dist}/Release",dist_tmp_dir / "Release", caching=True) != 0: f"{base_url}/dists/{dist}/InRelease", dist_tmp_dir / "InRelease", caching=True
)
if (
check_and_download(
f"{base_url}/dists/{dist}/Release", dist_tmp_dir / "Release", caching=True
)
!= 0
):
print("Invalid Repository") print("Invalid Repository")
if not (dist_dir / "Release").is_file(): if not (dist_dir / "Release").is_file():
print(f"{dist_dir/'Release'} never existed, upstream may not provide packages for {dist}, ignore this error") print(
f"{dist_dir/'Release'} never existed, upstream may not provide packages for {dist}, ignore this error"
)
return 0 return 0
return 1 return 1
check_and_download(f"{base_url}/dists/{dist}/Release.gpg",dist_tmp_dir / "Release.gpg", caching=True) check_and_download(
f"{base_url}/dists/{dist}/Release.gpg",
dist_tmp_dir / "Release.gpg",
caching=True,
)
comp_dir, comp_tmp_dir = mkdir_with_dot_tmp(dist_dir / repo) comp_dir, comp_tmp_dir = mkdir_with_dot_tmp(dist_dir / repo)
@ -148,12 +185,16 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
for line in fd: for line in fd:
if cnt_start: if cnt_start:
fields = line.split() fields = line.split()
if len(fields) != 3 or len(fields[0]) != 64: # 64 is SHA-256 checksum length if (
len(fields) != 3 or len(fields[0]) != 64
): # 64 is SHA-256 checksum length
break break
checksum, filesize, filename = tuple(fields) checksum, filesize, filename = tuple(fields)
if filename.startswith(f"{repo}/{arch_dir}/") or \ if (
filename.startswith(f"{repo}/Contents-{arch}") or \ filename.startswith(f"{repo}/{arch_dir}/")
filename.startswith(f"Contents-{arch}"): or filename.startswith(f"{repo}/Contents-{arch}")
or filename.startswith(f"Contents-{arch}")
):
fn = Path(filename) fn = Path(filename)
if len(fn.parts) <= 3: if len(fn.parts) <= 3:
# Contents-amd64.gz # Contents-amd64.gz
@ -163,7 +204,13 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
else: else:
# main/dep11/by-hash/MD5Sum/0af5c69679a24671cfd7579095a9cb5e # main/dep11/by-hash/MD5Sum/0af5c69679a24671cfd7579095a9cb5e
# deep_tmp_dir is in pkgidx_tmp_dir hence no extra garbage collection needed # deep_tmp_dir is in pkgidx_tmp_dir hence no extra garbage collection needed
deep_tmp_dir = dist_dir / Path(fn.parts[0]) / Path(fn.parts[1]) / ".tmp" / Path('/'.join(fn.parts[2:-1])) deep_tmp_dir = (
dist_dir
/ Path(fn.parts[0])
/ Path(fn.parts[1])
/ ".tmp"
/ Path("/".join(fn.parts[2:-1]))
)
deep_tmp_dir.mkdir(parents=True, exist_ok=True) deep_tmp_dir.mkdir(parents=True, exist_ok=True)
pkgidx_file = deep_tmp_dir / fn.name pkgidx_file = deep_tmp_dir / fn.name
else: else:
@ -174,33 +221,41 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
print("Failed to download:", pkglist_url) print("Failed to download:", pkglist_url)
continue continue
with pkgidx_file.open('rb') as t: content = t.read() with pkgidx_file.open("rb") as t:
content = t.read()
if len(content) != int(filesize): if len(content) != int(filesize):
print(f"Invalid size of {pkgidx_file}, expected {filesize}, skipped") print(
f"Invalid size of {pkgidx_file}, expected {filesize}, skipped"
)
pkgidx_file.unlink() pkgidx_file.unlink()
continue continue
if hashlib.sha256(content).hexdigest() != checksum: if hashlib.sha256(content).hexdigest() != checksum:
print(f"Invalid checksum of {pkgidx_file}, expected {checksum}, skipped") print(
f"Invalid checksum of {pkgidx_file}, expected {checksum}, skipped"
)
pkgidx_file.unlink() pkgidx_file.unlink()
continue continue
if pkgidx_content is None and pkgidx_file.stem == 'Packages': if pkgidx_content is None and pkgidx_file.stem == "Packages":
print(f"getting packages index content from {pkgidx_file.name}", flush=True) print(
f"getting packages index content from {pkgidx_file.name}",
flush=True,
)
suffix = pkgidx_file.suffix suffix = pkgidx_file.suffix
if suffix == '.xz': if suffix == ".xz":
pkgidx_content = lzma.decompress(content).decode('utf-8') pkgidx_content = lzma.decompress(content).decode("utf-8")
elif suffix == '.bz2': elif suffix == ".bz2":
pkgidx_content = bz2.decompress(content).decode('utf-8') pkgidx_content = bz2.decompress(content).decode("utf-8")
elif suffix == '.gz': elif suffix == ".gz":
pkgidx_content = gzip.decompress(content).decode('utf-8') pkgidx_content = gzip.decompress(content).decode("utf-8")
elif suffix == '': elif suffix == "":
pkgidx_content = content.decode('utf-8') pkgidx_content = content.decode("utf-8")
else: else:
print("unsupported format") print("unsupported format")
# Currently only support SHA-256 checksum, because # Currently only support SHA-256 checksum, because
# "Clients may not use the MD5Sum and SHA1 fields for security purposes, and must require a SHA256 or a SHA512 field." # "Clients may not use the MD5Sum and SHA1 fields for security purposes, and must require a SHA256 or a SHA512 field."
# from https://wiki.debian.org/DebianRepository/Format#A.22Release.22_files # from https://wiki.debian.org/DebianRepository/Format#A.22Release.22_files
if line.startswith('SHA256:'): if line.startswith("SHA256:"):
cnt_start = True cnt_start = True
if not cnt_start: if not cnt_start:
print("Cannot find SHA-256 checksum") print("Cannot find SHA-256 checksum")
@ -219,6 +274,7 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
except: except:
traceback.print_exc() traceback.print_exc()
return 1 return 1
if arch in ARCH_NO_PKGIDX: if arch in ARCH_NO_PKGIDX:
if collect_tmp_dir() == 1: if collect_tmp_dir() == 1:
return 1 return 1
@ -227,8 +283,10 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
if pkgidx_content is None: if pkgidx_content is None:
print("index is empty, failed") print("index is empty, failed")
if len(list(pkgidx_dir.glob('Packages*'))) == 0: if len(list(pkgidx_dir.glob("Packages*"))) == 0:
print(f"{pkgidx_dir/'Packages'} never existed, upstream may not provide {dist}/{repo}/{arch}, ignore this error") print(
f"{pkgidx_dir/'Packages'} never existed, upstream may not provide {dist}/{repo}/{arch}, ignore this error"
)
return 0 return 0
return 1 return 1
@ -236,7 +294,7 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
err = 0 err = 0
deb_count = 0 deb_count = 0
deb_size = 0 deb_size = 0
for pkg in pkgidx_content.split('\n\n'): for pkg in pkgidx_content.split("\n\n"):
if len(pkg) < 10: # ignore blanks if len(pkg) < 10: # ignore blanks
continue continue
try: try:
@ -255,14 +313,14 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
dest_dir = dest_filename.parent dest_dir = dest_filename.parent
if not dest_dir.is_dir(): if not dest_dir.is_dir():
dest_dir.mkdir(parents=True, exist_ok=True) dest_dir.mkdir(parents=True, exist_ok=True)
if dest_filename.suffix == '.deb': if dest_filename.suffix == ".deb":
deb_set[str(dest_filename.relative_to(dest_base_dir))] = pkg_size deb_set[str(dest_filename.relative_to(dest_base_dir))] = pkg_size
if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size: if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size:
print(f"Skipping {pkg_filename}, size {pkg_size}") print(f"Skipping {pkg_filename}, size {pkg_size}")
continue continue
pkg_url = f"{base_url}/{pkg_filename}" pkg_url = f"{base_url}/{pkg_filename}"
dest_tmp_filename = dest_filename.with_name('._syncing_.' + dest_filename.name) dest_tmp_filename = dest_filename.with_name("._syncing_." + dest_filename.name)
for retry in range(MAX_RETRY): for retry in range(MAX_RETRY):
print(f"downloading {pkg_url} to {dest_filename}", flush=True) print(f"downloading {pkg_url} to {dest_filename}", flush=True)
# break # dry run # break # dry run
@ -289,13 +347,18 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
print(f"{deb_count} packages, {deb_size} bytes in total", flush=True) print(f"{deb_count} packages, {deb_size} bytes in total", flush=True)
return err return err
def apt_delete_old_debs(dest_base_dir: Path, remote_set: Dict[str, int], dry_run: bool): def apt_delete_old_debs(dest_base_dir: Path, remote_set: Dict[str, int], dry_run: bool):
on_disk = set([ on_disk = set(
str(i.relative_to(dest_base_dir)) for i in dest_base_dir.glob('**/*.deb')]) [str(i.relative_to(dest_base_dir)) for i in dest_base_dir.glob("**/*.deb")]
)
deleting = on_disk - remote_set.keys() deleting = on_disk - remote_set.keys()
# print(on_disk) # print(on_disk)
# print(remote_set) # print(remote_set)
print(f"Deleting {len(deleting)} packages not in the index{' (dry run)' if dry_run else ''}", flush=True) print(
f"Deleting {len(deleting)} packages not in the index{' (dry run)' if dry_run else ''}",
flush=True,
)
for i in deleting: for i in deleting:
if dry_run: if dry_run:
print("Will delete", i) print("Will delete", i)
@ -303,6 +366,7 @@ def apt_delete_old_debs(dest_base_dir: Path, remote_set: Dict[str, int], dry_run
print("Deleting", i) print("Deleting", i)
(dest_base_dir / i).unlink() (dest_base_dir / i).unlink()
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -311,31 +375,35 @@ def main():
parser.add_argument("component", type=str, help="e.g. multiverse,contrib") parser.add_argument("component", type=str, help="e.g. multiverse,contrib")
parser.add_argument("arch", type=str, help="e.g. i386,amd64") parser.add_argument("arch", type=str, help="e.g. i386,amd64")
parser.add_argument("working_dir", type=Path, help="working directory") parser.add_argument("working_dir", type=Path, help="working directory")
parser.add_argument("--delete", action='store_true', parser.add_argument(
help='delete unreferenced package files') "--delete", action="store_true", help="delete unreferenced package files"
parser.add_argument("--delete-dry-run", action='store_true', )
help='print package files to be deleted only') parser.add_argument(
"--delete-dry-run",
action="store_true",
help="print package files to be deleted only",
)
args = parser.parse_args() args = parser.parse_args()
# generate lists of os codenames # generate lists of os codenames
os_list = args.os_version.split(',') os_list = args.os_version.split(",")
check_args("os_version", os_list) check_args("os_version", os_list)
os_list = replace_os_template(os_list) os_list = replace_os_template(os_list)
# generate a list of components and archs for each os codename # generate a list of components and archs for each os codename
def generate_list_for_oses(raw: str, name: str) -> List[List[str]]: def generate_list_for_oses(raw: str, name: str) -> List[List[str]]:
n_os = len(os_list) n_os = len(os_list)
if ':' in raw: if ":" in raw:
# specify os codenames for each component # specify os codenames for each component
lists = [] lists = []
for l in raw.split(':'): for l in raw.split(":"):
list_for_os = l.split(',') list_for_os = l.split(",")
check_args(name, list_for_os) check_args(name, list_for_os)
lists.append(list_for_os) lists.append(list_for_os)
assert len(lists) == n_os, f"{name} must be specified for each component" assert len(lists) == n_os, f"{name} must be specified for each component"
else: else:
# use same os codenames for all components # use same os codenames for all components
l = raw.split(',') l = raw.split(",")
check_args(name, l) check_args(name, l)
lists = [l] * n_os lists = [l] * n_os
return lists return lists
@ -350,7 +418,12 @@ def main():
for os, arch_list, comp_list in zip(os_list, arch_lists, component_lists): for os, arch_list, comp_list in zip(os_list, arch_lists, component_lists):
for comp in comp_list: for comp in comp_list:
for arch in arch_list: for arch in arch_list:
if apt_mirror(args.base_url, os, comp, arch, args.working_dir, deb_set=deb_set) != 0: if (
apt_mirror(
args.base_url, os, comp, arch, args.working_dir, deb_set=deb_set
)
!= 0
):
failed.append((os, comp, arch)) failed.append((os, comp, arch))
if len(failed) > 0: if len(failed) > 0:
print(f"Failed APT repos of {args.base_url}: ", failed) print(f"Failed APT repos of {args.base_url}: ", failed)
@ -363,5 +436,6 @@ def main():
total_size = sum(deb_set.values()) total_size = sum(deb_set.values())
fd.write(f"+{total_size}") fd.write(f"+{total_size}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,36 +1,46 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys
import threading import threading
import traceback
import queue import queue
from pathlib import Path from pathlib import Path
from datetime import datetime
import tempfile import tempfile
import hashlib
import requests import requests
BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://api.github.com/repos/") BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://api.github.com/repos/")
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR") WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR")
MIRROR_BASE_URL = os.getenv("MIRROR_BASE_URL", 'https://mirrors.tuna.tsinghua.edu.cn/github-raw/') MIRROR_BASE_URL = os.getenv(
"MIRROR_BASE_URL", "https://mirrors.tuna.tsinghua.edu.cn/github-raw/"
)
def raw_to_mirror(s: str) -> str: def raw_to_mirror(s: str) -> str:
return s.replace("https://raw.githubusercontent.com/", return s.replace("https://raw.githubusercontent.com/", MIRROR_BASE_URL)
MIRROR_BASE_URL)
def delete_line_with(w: str, s: str) -> str: def delete_line_with(w: str, s: str) -> str:
return "\n".join(list(filter(lambda x: x.count(w) == 0, s.splitlines()))) return "\n".join(list(filter(lambda x: x.count(w) == 0, s.splitlines())))
def delete_line_with_gbpdistro(s: str) -> str: def delete_line_with_gbpdistro(s: str) -> str:
return delete_line_with("gbpdistro", s) return delete_line_with("gbpdistro", s)
REPOS = [ REPOS = [
# owner/repo, tree, tree, tree, blob # owner/repo, tree, tree, tree, blob
## for stackage ## for stackage
["fpco/stackage-content", "master", "stack", "global-hints.yaml"], ["fpco/stackage-content", "master", "stack", "global-hints.yaml"],
## for rosdep ## for rosdep
{ "path": ["ros/rosdistro", "master", "rosdep", "sources.list.d", "20-default.list"], "filter": [ raw_to_mirror, delete_line_with_gbpdistro ] }, {
"path": [
"ros/rosdistro",
"master",
"rosdep",
"sources.list.d",
"20-default.list",
],
"filter": [raw_to_mirror, delete_line_with_gbpdistro],
},
["ros/rosdistro", "master", "rosdep", "osx-homebrew.yaml"], ["ros/rosdistro", "master", "rosdep", "osx-homebrew.yaml"],
["ros/rosdistro", "master", "rosdep", "base.yaml"], ["ros/rosdistro", "master", "rosdep", "base.yaml"],
["ros/rosdistro", "master", "rosdep", "python.yaml"], ["ros/rosdistro", "master", "rosdep", "python.yaml"],
@ -44,36 +54,46 @@ REPOS = [
TIMEOUT_OPTION = (7, 10) TIMEOUT_OPTION = (7, 10)
total_size = 0 total_size = 0
# wrap around requests.get to use token if available # wrap around requests.get to use token if available
def github_get(*args, **kwargs): def github_get(*args, **kwargs):
headers = kwargs['headers'] if 'headers' in kwargs else {} headers = kwargs["headers"] if "headers" in kwargs else {}
if 'GITHUB_TOKEN' in os.environ: if "GITHUB_TOKEN" in os.environ:
headers['Authorization'] = 'token {}'.format( headers["Authorization"] = "token {}".format(os.environ["GITHUB_TOKEN"])
os.environ['GITHUB_TOKEN']) kwargs["headers"] = headers
kwargs['headers'] = headers
return requests.get(*args, **kwargs) return requests.get(*args, **kwargs)
def github_tree(*args, **kwargs): def github_tree(*args, **kwargs):
headers = kwargs['headers'] if 'headers' in kwargs else {} headers = kwargs["headers"] if "headers" in kwargs else {}
headers["Accept"] = "application/vnd.github.v3+json" headers["Accept"] = "application/vnd.github.v3+json"
kwargs['headers'] = headers kwargs["headers"] = headers
return github_get(*args, **kwargs) return github_get(*args, **kwargs)
# NOTE blob API supports file up to 100MB # NOTE blob API supports file up to 100MB
# To get larger one, we need raw.githubcontent, which is not implemented now # To get larger one, we need raw.githubcontent, which is not implemented now
def github_blob(*args, **kwargs): def github_blob(*args, **kwargs):
headers = kwargs['headers'] if 'headers' in kwargs else {} headers = kwargs["headers"] if "headers" in kwargs else {}
headers["Accept"] = "application/vnd.github.v3.raw" headers["Accept"] = "application/vnd.github.v3.raw"
kwargs['headers'] = headers kwargs["headers"] = headers
return github_get(*args, **kwargs) return github_get(*args, **kwargs)
def do_download(remote_url: str, dst_file: Path, remote_size: int, sha: str, filter=None):
def do_download(
remote_url: str, dst_file: Path, remote_size: int, sha: str, filter=None
):
# NOTE the stream=True parameter below # NOTE the stream=True parameter below
with github_blob(remote_url, stream=True) as r: with github_blob(remote_url, stream=True) as r:
r.raise_for_status() r.raise_for_status()
tmp_dst_file = None tmp_dst_file = None
try: try:
with tempfile.NamedTemporaryFile(prefix="." + dst_file.name + ".", suffix=".tmp", dir=dst_file.parent, delete=False) as f: with tempfile.NamedTemporaryFile(
prefix="." + dst_file.name + ".",
suffix=".tmp",
dir=dst_file.parent,
delete=False,
) as f:
tmp_dst_file = Path(f.name) tmp_dst_file = Path(f.name)
for chunk in r.iter_content(chunk_size=1024**2): for chunk in r.iter_content(chunk_size=1024**2):
if chunk: # filter out keep-alive new chunks if chunk: # filter out keep-alive new chunks
@ -82,7 +102,9 @@ def do_download(remote_url: str, dst_file: Path, remote_size: int, sha: str, fil
# check for downloaded size # check for downloaded size
downloaded_size = tmp_dst_file.stat().st_size downloaded_size = tmp_dst_file.stat().st_size
if remote_size != -1 and downloaded_size != remote_size: if remote_size != -1 and downloaded_size != remote_size:
raise Exception(f'File {dst_file.as_posix()} size mismatch: downloaded {downloaded_size} bytes, expected {remote_size} bytes') raise Exception(
f"File {dst_file.as_posix()} size mismatch: downloaded {downloaded_size} bytes, expected {remote_size} bytes"
)
if filter != None: if filter != None:
with open(tmp_dst_file, "r+") as f: with open(tmp_dst_file, "r+") as f:
s = f.read() s = f.read()
@ -108,6 +130,7 @@ def do_download(remote_url: str, dst_file: Path, remote_size: int, sha: str, fil
if tmp_dst_file.is_file(): if tmp_dst_file.is_file():
tmp_dst_file.unlink() tmp_dst_file.unlink()
def downloading_worker(q): def downloading_worker(q):
while True: while True:
item = q.get() item = q.get()
@ -116,7 +139,7 @@ def downloading_worker(q):
filter = item.pop(0) # remove filter filter = item.pop(0) # remove filter
dst_file = Path('/'.join(item)) dst_file = Path("/".join(item))
dst_file.parent.mkdir(parents=True, exist_ok=True) dst_file.parent.mkdir(parents=True, exist_ok=True)
item.pop(0) # remove working dir item.pop(0) # remove working dir
@ -125,8 +148,8 @@ def downloading_worker(q):
tree = item.pop(0) tree = item.pop(0)
tree_child = item.pop(0) tree_child = item.pop(0)
child_is_leaf = False child_is_leaf = False
url = '' url = ""
sha = '' sha = ""
size = 0 size = 0
while not child_is_leaf: while not child_is_leaf:
with github_tree(f"{BASE_URL}{owner_repo}/git/trees/{tree}") as r: with github_tree(f"{BASE_URL}{owner_repo}/git/trees/{tree}") as r:
@ -147,8 +170,7 @@ def downloading_worker(q):
break break
else: else:
raise Exception raise Exception
if not dst_file.is_symlink() or \ if not dst_file.is_symlink() or Path(os.readlink(dst_file)).name != sha:
Path(os.readlink(dst_file)).name != sha:
do_download(url, dst_file, size, sha, filter) do_download(url, dst_file, size, sha, filter)
else: else:
print("Skip", dst_file) print("Skip", dst_file)
@ -168,12 +190,15 @@ def create_workers(n):
t.start() t.start()
return task_queue return task_queue
def main(): def main():
import argparse import argparse
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--working-dir", default=WORKING_DIR) parser.add_argument("--working-dir", default=WORKING_DIR)
parser.add_argument("--workers", default=1, type=int, parser.add_argument(
help='number of concurrent downloading jobs') "--workers", default=1, type=int, help="number of concurrent downloading jobs"
)
args = parser.parse_args() args = parser.parse_args()
if args.working_dir is None: if args.working_dir is None:
@ -198,6 +223,7 @@ def main():
for i in range(args.workers): for i in range(args.workers):
task_queue.put(None) task_queue.put(None)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -10,25 +10,30 @@ from pathlib import Path
# mainly from apt-sync.py # mainly from apt-sync.py
FORMULAE_BREW_SH_GITHUB_ACTIONS_ARTIFACT_API = os.getenv("TUNASYNC_UPSTREAM_URL", "https://api.github.com/repos/Homebrew/formulae.brew.sh/actions/artifacts?name=github-pages") FORMULAE_BREW_SH_GITHUB_ACTIONS_ARTIFACT_API = os.getenv(
"TUNASYNC_UPSTREAM_URL",
"https://api.github.com/repos/Homebrew/formulae.brew.sh/actions/artifacts?name=github-pages",
)
WORKING_DIR = Path(os.getenv("TUNASYNC_WORKING_DIR", "/data")) WORKING_DIR = Path(os.getenv("TUNASYNC_WORKING_DIR", "/data"))
DOWNLOAD_TIMEOUT=int(os.getenv('DOWNLOAD_TIMEOUT', '1800')) DOWNLOAD_TIMEOUT = int(os.getenv("DOWNLOAD_TIMEOUT", "1800"))
github_api_headers = { github_api_headers = {
"Accept": "application/vnd.github+json", "Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28", "X-GitHub-Api-Version": "2022-11-28",
} }
if 'GITHUB_TOKEN' in os.environ: if "GITHUB_TOKEN" in os.environ:
github_api_headers['Authorization'] = 'token {}'.format( github_api_headers["Authorization"] = "token {}".format(os.environ["GITHUB_TOKEN"])
os.environ['GITHUB_TOKEN'])
else: else:
# https://github.com/actions/upload-artifact/issues/51 # https://github.com/actions/upload-artifact/issues/51
# the token should have 'public_repo' access # the token should have 'public_repo' access
raise Exception("GITHUB_TOKEN is required") raise Exception("GITHUB_TOKEN is required")
def formulae_github_pages(zip_file: Path, unzip_directory: Path, tar_directory: Path): def formulae_github_pages(zip_file: Path, unzip_directory: Path, tar_directory: Path):
artifacts = requests.get(FORMULAE_BREW_SH_GITHUB_ACTIONS_ARTIFACT_API, headers=github_api_headers) artifacts = requests.get(
FORMULAE_BREW_SH_GITHUB_ACTIONS_ARTIFACT_API, headers=github_api_headers
)
artifacts.raise_for_status() artifacts.raise_for_status()
artifacts = artifacts.json() artifacts = artifacts.json()
latest = None latest = None
@ -40,7 +45,10 @@ def formulae_github_pages(zip_file: Path, unzip_directory: Path, tar_directory:
check_and_download(zip_url, zip_file, zip_file, github_api_headers) check_and_download(zip_url, zip_file, zip_file, github_api_headers)
sp.run(["unzip", str(zip_file), "-d", str(unzip_directory)]) sp.run(["unzip", str(zip_file), "-d", str(unzip_directory)])
sp.run(["tar", "-C", str(tar_directory), "-xf", str(unzip_directory / "artifact.tar")]) sp.run(
["tar", "-C", str(tar_directory), "-xf", str(unzip_directory / "artifact.tar")]
)
def bottles(formula_file: Path): def bottles(formula_file: Path):
b = {} b = {}
@ -49,7 +57,7 @@ def bottles(formula_file: Path):
for formula in formulae: for formula in formulae:
if formula["versions"]["bottle"] and "stable" in formula["bottle"]: if formula["versions"]["bottle"] and "stable" in formula["bottle"]:
bs = formula["bottle"]["stable"] bs = formula["bottle"]["stable"]
for (platform, v) in bs["files"].items(): for platform, v in bs["files"].items():
sha256 = v["sha256"] sha256 = v["sha256"]
url = v["url"] url = v["url"]
name = formula["name"] name = formula["name"]
@ -63,28 +71,36 @@ def bottles(formula_file: Path):
} }
return b return b
ghcr_headers = { ghcr_headers = {
"Accept": "application/vnd.oci.image.index.v1+json", "Accept": "application/vnd.oci.image.index.v1+json",
"Authorization": "Bearer QQ==" "Authorization": "Bearer QQ==",
} }
# borrowed from apt-sync.py # borrowed from apt-sync.py
def check_and_download(url: str, dst_file: Path, dst_tmp_file: Path, headers=ghcr_headers): def check_and_download(
if dst_file.is_file(): return 2 # old file url: str, dst_file: Path, dst_tmp_file: Path, headers=ghcr_headers
):
if dst_file.is_file():
return 2 # old file
try: try:
start = time.time() start = time.time()
with requests.get(url, stream=True, timeout=(5, 10), headers=headers) as r: with requests.get(url, stream=True, timeout=(5, 10), headers=headers) as r:
r.raise_for_status() r.raise_for_status()
if 'last-modified' in r.headers: if "last-modified" in r.headers:
remote_ts = parsedate_to_datetime( remote_ts = parsedate_to_datetime(
r.headers['last-modified']).timestamp() r.headers["last-modified"]
else: remote_ts = None ).timestamp()
else:
remote_ts = None
with dst_tmp_file.open('wb') as f: with dst_tmp_file.open("wb") as f:
for chunk in r.iter_content(chunk_size=1024**2): for chunk in r.iter_content(chunk_size=1024**2):
if time.time() - start > DOWNLOAD_TIMEOUT: if time.time() - start > DOWNLOAD_TIMEOUT:
raise TimeoutError("Download timeout") raise TimeoutError("Download timeout")
if not chunk: continue # filter out keep-alive new chunks if not chunk:
continue # filter out keep-alive new chunks
f.write(chunk) f.write(chunk)
if remote_ts is not None: if remote_ts is not None:
@ -92,9 +108,11 @@ def check_and_download(url: str, dst_file: Path, dst_tmp_file: Path, headers=ghc
return 0 return 0
except BaseException as e: except BaseException as e:
print(e, flush=True) print(e, flush=True)
if dst_tmp_file.is_file(): dst_tmp_file.unlink() if dst_tmp_file.is_file():
dst_tmp_file.unlink()
return 1 return 1
if __name__ == "__main__": if __name__ == "__main__":
# clean tmp file from previous sync # clean tmp file from previous sync
TMP_DIR = WORKING_DIR / ".tmp" TMP_DIR = WORKING_DIR / ".tmp"

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import traceback import traceback
import os import os
import sys
import subprocess as sp import subprocess as sp
import tempfile import tempfile
import argparse import argparse
@ -16,47 +15,50 @@ from pathlib import Path
from typing import List, Dict from typing import List, Dict
import requests import requests
REPO_SIZE_FILE = os.getenv('REPO_SIZE_FILE', '') REPO_SIZE_FILE = os.getenv("REPO_SIZE_FILE", "")
DOWNLOAD_TIMEOUT=int(os.getenv('DOWNLOAD_TIMEOUT', '1800')) DOWNLOAD_TIMEOUT = int(os.getenv("DOWNLOAD_TIMEOUT", "1800"))
REPO_STAT = {} REPO_STAT = {}
def calc_repo_size(path: Path): def calc_repo_size(path: Path):
dbfiles = path.glob('repodata/*primary.*') dbfiles = path.glob("repodata/*primary.*")
with tempfile.NamedTemporaryFile() as tmp: with tempfile.NamedTemporaryFile() as tmp:
dec = None dec = None
dbfile = None dbfile = None
for db in dbfiles: for db in dbfiles:
dbfile = db dbfile = db
suffixes = db.suffixes suffixes = db.suffixes
if suffixes[-1] == '.bz2': if suffixes[-1] == ".bz2":
dec = bz2.decompress dec = bz2.decompress
suffixes = suffixes[:-1] suffixes = suffixes[:-1]
elif suffixes[-1] == '.gz': elif suffixes[-1] == ".gz":
dec = gzip.decompress dec = gzip.decompress
suffixes = suffixes[:-1] suffixes = suffixes[:-1]
elif suffixes[-1] in ('.sqlite', '.xml'): elif suffixes[-1] in (".sqlite", ".xml"):
dec = lambda x: x dec = lambda x: x
if dec is None: if dec is None:
print(f"Failed to read from {path}: {list(dbfiles)}", flush=True) print(f"Failed to read from {path}: {list(dbfiles)}", flush=True)
return return
with db.open('rb') as f: with db.open("rb") as f:
tmp.write(dec(f.read())) tmp.write(dec(f.read()))
tmp.flush() tmp.flush()
if suffixes[-1] == '.sqlite': if suffixes[-1] == ".sqlite":
conn = sqlite3.connect(tmp.name) conn = sqlite3.connect(tmp.name)
c = conn.cursor() c = conn.cursor()
c.execute("select sum(size_package),count(1) from packages") c.execute("select sum(size_package),count(1) from packages")
size, cnt = c.fetchone() size, cnt = c.fetchone()
conn.close() conn.close()
elif suffixes[-1] == '.xml': elif suffixes[-1] == ".xml":
try: try:
tree = ET.parse(tmp.name) tree = ET.parse(tmp.name)
root = tree.getroot() root = tree.getroot()
assert root.tag.endswith('metadata') assert root.tag.endswith("metadata")
cnt, size = 0, 0 cnt, size = 0, 0
for location in root.findall('./{http://linux.duke.edu/metadata/common}package/{http://linux.duke.edu/metadata/common}size'): for location in root.findall(
size += int(location.attrib['package']) "./{http://linux.duke.edu/metadata/common}package/{http://linux.duke.edu/metadata/common}size"
):
size += int(location.attrib["package"])
cnt += 1 cnt += 1
except: except:
traceback.print_exc() traceback.print_exc()
@ -71,21 +73,25 @@ def calc_repo_size(path: Path):
global REPO_STAT global REPO_STAT
REPO_STAT[str(path)] = (size, cnt) if cnt > 0 else (0, 0) # size can be None REPO_STAT[str(path)] = (size, cnt) if cnt > 0 else (0, 0) # size can be None
def check_and_download(url: str, dst_file: Path) -> int: def check_and_download(url: str, dst_file: Path) -> int:
try: try:
start = time.time() start = time.time()
with requests.get(url, stream=True, timeout=(5, 10)) as r: with requests.get(url, stream=True, timeout=(5, 10)) as r:
r.raise_for_status() r.raise_for_status()
if 'last-modified' in r.headers: if "last-modified" in r.headers:
remote_ts = parsedate_to_datetime( remote_ts = parsedate_to_datetime(
r.headers['last-modified']).timestamp() r.headers["last-modified"]
else: remote_ts = None ).timestamp()
else:
remote_ts = None
with dst_file.open('wb') as f: with dst_file.open("wb") as f:
for chunk in r.iter_content(chunk_size=1024**2): for chunk in r.iter_content(chunk_size=1024**2):
if time.time() - start > DOWNLOAD_TIMEOUT: if time.time() - start > DOWNLOAD_TIMEOUT:
raise TimeoutError("Download timeout") raise TimeoutError("Download timeout")
if not chunk: continue # filter out keep-alive new chunks if not chunk:
continue # filter out keep-alive new chunks
f.write(chunk) f.write(chunk)
if remote_ts is not None: if remote_ts is not None:
@ -93,13 +99,15 @@ def check_and_download(url: str, dst_file: Path)->int:
return 0 return 0
except BaseException as e: except BaseException as e:
print(e, flush=True) print(e, flush=True)
if dst_file.is_file(): dst_file.unlink() if dst_file.is_file():
dst_file.unlink()
return 1 return 1
def download_repodata(url: str, path: Path) -> int: def download_repodata(url: str, path: Path) -> int:
path = path / "repodata" path = path / "repodata"
path.mkdir(exist_ok=True) path.mkdir(exist_ok=True)
oldfiles = set(path.glob('*.*')) oldfiles = set(path.glob("*.*"))
newfiles = set() newfiles = set()
if check_and_download(url + "/repodata/repomd.xml", path / ".repomd.xml") != 0: if check_and_download(url + "/repodata/repomd.xml", path / ".repomd.xml") != 0:
print(f"Failed to download the repomd.xml of {url}") print(f"Failed to download the repomd.xml of {url}")
@ -107,13 +115,15 @@ def download_repodata(url: str, path: Path) -> int:
try: try:
tree = ET.parse(path / ".repomd.xml") tree = ET.parse(path / ".repomd.xml")
root = tree.getroot() root = tree.getroot()
assert root.tag.endswith('repomd') assert root.tag.endswith("repomd")
for location in root.findall('./{http://linux.duke.edu/metadata/repo}data/{http://linux.duke.edu/metadata/repo}location'): for location in root.findall(
href = location.attrib['href'] "./{http://linux.duke.edu/metadata/repo}data/{http://linux.duke.edu/metadata/repo}location"
assert len(href) > 9 and href[:9] == 'repodata/' ):
href = location.attrib["href"]
assert len(href) > 9 and href[:9] == "repodata/"
fn = path / href[9:] fn = path / href[9:]
newfiles.add(fn) newfiles.add(fn)
if check_and_download(url + '/' + href, fn) != 0: if check_and_download(url + "/" + href, fn) != 0:
print(f"Failed to download the {href}") print(f"Failed to download the {href}")
return 1 return 1
except BaseException as e: except BaseException as e:
@ -122,49 +132,61 @@ def download_repodata(url: str, path: Path) -> int:
(path / ".repomd.xml").rename(path / "repomd.xml") # update the repomd.xml (path / ".repomd.xml").rename(path / "repomd.xml") # update the repomd.xml
newfiles.add(path / "repomd.xml") newfiles.add(path / "repomd.xml")
for i in (oldfiles - newfiles): for i in oldfiles - newfiles:
print(f"Deleting old files: {i}") print(f"Deleting old files: {i}")
i.unlink() i.unlink()
def check_args(prop: str, lst: List[str]): def check_args(prop: str, lst: List[str]):
for s in lst: for s in lst:
if len(s)==0 or ' ' in s: if len(s) == 0 or " " in s:
raise ValueError(f"Invalid item in {prop}: {repr(s)}") raise ValueError(f"Invalid item in {prop}: {repr(s)}")
def substitute_vars(s: str, vardict: Dict[str, str]) -> str: def substitute_vars(s: str, vardict: Dict[str, str]) -> str:
for key, val in vardict.items(): for key, val in vardict.items():
tpl = "@{" + key + "}" tpl = "@{" + key + "}"
s = s.replace(tpl, val) s = s.replace(tpl, val)
return s return s
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("base_url", type=str, help="base URL") parser.add_argument("base_url", type=str, help="base URL")
parser.add_argument("os_version", type=str, help="e.g. 7-8,9") parser.add_argument("os_version", type=str, help="e.g. 7-8,9")
parser.add_argument("component", type=str, help="e.g. mysql56-community,mysql57-community") parser.add_argument(
"component", type=str, help="e.g. mysql56-community,mysql57-community"
)
parser.add_argument("arch", type=str, help="e.g. x86_64,aarch64") parser.add_argument("arch", type=str, help="e.g. x86_64,aarch64")
parser.add_argument("repo_name", type=str, help="e.g. @{comp}-el@{os_ver}") parser.add_argument("repo_name", type=str, help="e.g. @{comp}-el@{os_ver}")
parser.add_argument("working_dir", type=Path, help="working directory") parser.add_argument("working_dir", type=Path, help="working directory")
parser.add_argument("--download-repodata", action='store_true', parser.add_argument(
help='download repodata files instead of generating them') "--download-repodata",
parser.add_argument("--pass-arch-to-reposync", action='store_true', action="store_true",
help='''pass --arch to reposync to further filter packages by 'arch' field in metadata (NOT recommended, prone to missing packages in some repositories, e.g. mysql)''') help="download repodata files instead of generating them",
)
parser.add_argument(
"--pass-arch-to-reposync",
action="store_true",
help="""pass --arch to reposync to further filter packages by 'arch' field in metadata (NOT recommended, prone to missing packages in some repositories, e.g. mysql)""",
)
args = parser.parse_args() args = parser.parse_args()
os_list = [] os_list = []
for os_version in args.os_version.split(','): for os_version in args.os_version.split(","):
if '-' in os_version and '-stream' not in os_version: if "-" in os_version and "-stream" not in os_version:
dash = os_version.index('-') dash = os_version.index("-")
os_list = os_list + [ str(i) for i in range( os_list = os_list + [
int(os_version[:dash]), str(i)
1+int(os_version[dash+1:])) ] for i in range(int(os_version[:dash]), 1 + int(os_version[dash + 1 :]))
]
else: else:
os_list.append(os_version) os_list.append(os_version)
check_args("os_version", os_list) check_args("os_version", os_list)
component_list = args.component.split(',') component_list = args.component.split(",")
check_args("component", component_list) check_args("component", component_list)
arch_list = args.arch.split(',') arch_list = args.arch.split(",")
check_args("arch", arch_list) check_args("arch", arch_list)
failed = [] failed = []
@ -175,15 +197,17 @@ def main():
for os in os_list: for os in os_list:
for comp in component_list: for comp in component_list:
vardict = { vardict = {
'arch': arch, "arch": arch,
'os_ver': os, "os_ver": os,
'comp': comp, "comp": comp,
} }
name = substitute_vars(args.repo_name, vardict) name = substitute_vars(args.repo_name, vardict)
url = substitute_vars(args.base_url, vardict) url = substitute_vars(args.base_url, vardict)
try: try:
probe_url = url + ('' if url.endswith('/') else '/') + "repodata/repomd.xml" probe_url = (
url + ("" if url.endswith("/") else "/") + "repodata/repomd.xml"
)
r = requests.head(probe_url, timeout=(7, 7)) r = requests.head(probe_url, timeout=(7, 7))
if r.status_code < 400 or r.status_code == 403: if r.status_code < 400 or r.status_code == 403:
yield (name, url) yield (name, url)
@ -195,19 +219,23 @@ def main():
for arch in arch_list: for arch in arch_list:
dest_dirs = [] dest_dirs = []
conf = tempfile.NamedTemporaryFile("w", suffix=".conf") conf = tempfile.NamedTemporaryFile("w", suffix=".conf")
conf.write(''' conf.write(
"""
[main] [main]
keepcache=0 keepcache=0
''') """
)
for name, url in combination_os_comp(arch): for name, url in combination_os_comp(arch):
conf.write(f''' conf.write(
f"""
[{name}] [{name}]
name={name} name={name}
baseurl={url} baseurl={url}
repo_gpgcheck=0 repo_gpgcheck=0
gpgcheck=0 gpgcheck=0
enabled=1 enabled=1
''') """
)
dst = (args.working_dir / name).absolute() dst = (args.working_dir / name).absolute()
dst.mkdir(parents=True, exist_ok=True) dst.mkdir(parents=True, exist_ok=True)
dest_dirs.append(dst) dest_dirs.append(dst)
@ -217,13 +245,18 @@ enabled=1
if len(dest_dirs) == 0: if len(dest_dirs) == 0:
print("Nothing to sync", flush=True) print("Nothing to sync", flush=True)
failed.append(('', arch)) failed.append(("", arch))
continue continue
cmd_args = [ cmd_args = [
"dnf", "reposync", "dnf",
"-c", conf.name, "reposync",
"--delete", "-p", str(args.working_dir.absolute())] "-c",
conf.name,
"--delete",
"-p",
str(args.working_dir.absolute()),
]
if args.pass_arch_to_reposync: if args.pass_arch_to_reposync:
cmd_args += ["--arch", arch] cmd_args += ["--arch", arch]
print(f"Launching dnf reposync with command: {cmd_args}", flush=True) print(f"Launching dnf reposync with command: {cmd_args}", flush=True)
@ -237,7 +270,16 @@ enabled=1
if args.download_repodata: if args.download_repodata:
download_repodata(url, path) download_repodata(url, path)
else: else:
cmd_args = ["createrepo_c", "--update", "-v", "-c", cache_dir, "-o", str(path), str(path)] cmd_args = [
"createrepo_c",
"--update",
"-v",
"-c",
cache_dir,
"-o",
str(path),
str(path),
]
print(f"Launching createrepo with command: {cmd_args}", flush=True) print(f"Launching createrepo with command: {cmd_args}", flush=True)
ret = sp.run(cmd_args) ret = sp.run(cmd_args)
calc_repo_size(path) calc_repo_size(path)
@ -250,5 +292,6 @@ enabled=1
total_size = sum([r[0] for r in REPO_STAT.values()]) total_size = sum([r[0] for r in REPO_STAT.values()])
fd.write(f"+{total_size}") fd.write(f"+{total_size}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()