diff --git a/nix-channels.py b/nix-channels.py index bec8ec9..7253d1a 100755 --- a/nix-channels.py +++ b/nix-channels.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import hashlib +import json import logging import lzma import os @@ -11,6 +12,7 @@ import subprocess from pyquery import PyQuery as pq from datetime import datetime, timedelta from pathlib import Path +from concurrent.futures import ThreadPoolExecutor from urllib3.util.retry import Retry @@ -26,6 +28,11 @@ else: MIRROR_BASE_URL = os.getenv("MIRROR_BASE_URL", 'https://mirrors.tuna.tsinghua.edu.cn/nix-channels') WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') +PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) +THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10)) +DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '1') == '1' +RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30)) + STORE_DIR = 'store' RELEASES_DIR = 'releases' CLONE_SINCE = datetime(2018, 12, 1) @@ -258,6 +265,9 @@ def clone_channels(): return channels_to_update +def hash_part(path): + return path.split('/')[-1].split('-', 1)[0] + def update_channels(channels): logging.info(f'- Updating binary cache') @@ -266,9 +276,11 @@ def update_channels(channels): for channel in channels: logging.info(f' - {channel}') + chan_path = working_dir / channel chan_path_update = working_dir / f'.{channel}.update' upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text() + upstream_binary_cache = upstream_binary_cache.rstrip('/') # All the channels should have https://cache.nixos.org as binary cache # URL. We download nix-cache-info here (once per sync) to avoid @@ -287,34 +299,144 @@ def update_channels(channels): logging.info(f' - {len(paths)} paths listed') - # xargs can splits up the argument lists and invokes nix copy multiple - # times to avoid E2BIG (Argument list too long) - nix_process = subprocess.Popen( - [ 'xargs', 'nix', 'copy', - '--from', upstream_binary_cache, - '--to', nix_store_dest, - '--verbose' - ], - universal_newlines=True, - stdin=subprocess.PIPE - ) + todo = {} + channel_failure = False - for path in paths: - nix_process.stdin.write(path.decode() + '\n') + # Batch paths to avoid E2BIG - retcode = nix_process.wait() - - if retcode == 0: - chan_path_update.rename(working_dir / channel) - logging.info(f' - Done') + for i in range(0, len(paths), PATH_BATCH): + batch = paths[i : i + PATH_BATCH] + process = subprocess.run( + [ + 'nix', 'path-info', + '--store', upstream_binary_cache, + '--recursive', '--json' + ] + batch, + stdout=subprocess.PIPE + ) + if process.returncode != 0: + channel_failure = True + logging.info(f' - Error status: {process.returncode}') + break else: - global failure - failure = True + infos = json.loads(process.stdout) + for info in infos: + ha = hash_part(info['path']) + todo[ha] = (info['url'], f'{ha}.narinfo') - logging.info(f' - nix copy failed') + logging.info(f' - {len(todo)} paths to download') + + digits = len(str(len(todo))) + + def try_mirror(index, paths): + index += 1 + prefix = f'[{str(index).rjust(digits)}/{len(todo)}]' + try: + for path in paths: + url = f'{upstream_binary_cache}/{path}' + dest = working_dir / STORE_DIR / path + if dest.exists(): continue + download(url, dest) + logging.info(f' - {prefix} {path}') + return True + except (requests.exceptions.ConnectionError, WrongSize): + return False + + with ThreadPoolExecutor(max_workers=THREADS) as executor: + results = executor.map( + lambda job: try_mirror(*job), + enumerate(todo.values()) + ) + if not all(results): + channel_failure = True + + if channel_failure: + logging.info(f' - Finished with errors, not updating symlink') + else: + chan_path_update.rename(chan_path) + logging.info(f' - Finished with success, symlink updated') + +def parse_narinfo(narinfo): + res = {} + for line in narinfo.splitlines(): + key, value = line.split(': ', 1) + res[key] = value + return res + +def garbage_collect(): + logging.info(f'- Collecting garbage') + + time_threshold = datetime.now() - timedelta(days=RETAIN_DAYS) + + last_updated = {} + latest = {} + alive = set() + + for release in (working_dir / RELEASES_DIR).iterdir(): + channel = release.name.split('@')[0] + date_str = (release / '.released-time').read_text() + released_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + + if released_date >= time_threshold: + alive.add(release) + + if channel not in last_updated \ + or last_updated[channel] < released_date: + last_updated[channel] = released_date + latest[channel] = release + + alive.update(latest.values()) + + logging.info(f' - {len(alive)} releases alive') + + closure = set() + + for release in alive: + with lzma.open(str(release / 'store-paths.xz')) as f: + paths = [ path.rstrip() for path in f ] + + for i in range(0, len(paths), PATH_BATCH): + batch = paths[i : i + PATH_BATCH] + + process = subprocess.run( + [ + 'nix', 'path-info', + '--store', nix_store_dest, + '--recursive' + ] + batch, + stdout=subprocess.PIPE + ) + + for path in process.stdout.decode().splitlines(): + closure.add(hash_part(path)) + + logging.info(f' - {len(closure)} paths in closure') + + deleted = 0 + + for path in (working_dir / STORE_DIR).iterdir(): + if not path.name.endswith('.narinfo'): + continue + + hash = path.name.split('.narinfo', 1)[0] + if hash in closure: + continue + + deleted += 1 + + if DELETE_OLD: + narinfo = parse_narinfo(path.read_text()) + path.unlink(missing_ok=True) + (working_dir / STORE_DIR / narinfo['URL']).unlink(missing_ok=True) + + if DELETE_OLD: + logging.info(f' - {deleted} paths deleted') + else: + logging.info(f' - {deleted} paths now unreachable') if __name__ == '__main__': channels = clone_channels() update_channels(channels) + garbage_collect() if failure: sys.exit(1)