diff --git a/nix-channels.py b/nix-channels.py index 77b38d8..e661eb5 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import hashlib +import json import logging import lzma import os @@ -11,6 +12,7 @@ import subprocess from pyquery import PyQuery as pq from datetime import datetime, timedelta from pathlib import Path +from concurrent.futures import ThreadPoolExecutor from urllib3.util.retry import Retry @@ -27,6 +29,7 @@ else: WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) +THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10)) DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '0') == '1' RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30)) @@ -262,6 +265,9 @@ def clone_channels(): return channels_to_update +def hash_part(path): + return path.split('/')[-1].split('-', 1)[0] + def update_channels(channels): logging.info(f'- Updating binary cache') @@ -274,6 +280,7 @@ def update_channels(channels): chan_path_update = working_dir / f'.{channel}.update' upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text() + upstream_binary_cache = upstream_binary_cache.rstrip('/') # All the channels should have https://cache.nixos.org as binary cache # URL. We download nix-cache-info here (once per sync) to avoid @@ -292,26 +299,56 @@ def update_channels(channels): logging.info(f' - {len(paths)} paths listed') - # Batch paths to avoid E2BIG - + todo = {} channel_failure = False + # Batch paths to avoid E2BIG + for i in range(0, len(paths), PATH_BATCH): batch = paths[i : i + PATH_BATCH] process = subprocess.run( [ - 'nix', 'copy', - '--from', upstream_binary_cache, - '--to', nix_store_dest, - '--verbose' - ] + batch + 'nix', 'path-info', + '--store', upstream_binary_cache, + '--recursive', '--json' + ] + batch, + stdout=subprocess.PIPE ) if process.returncode != 0: - logging.info(f' - Error status: {process.returncode}') channel_failure = True + logging.info(f' - Error status: {process.returncode}') + break + else: + infos = json.loads(process.stdout) + for info in infos: + ha = hash_part(info['path']) + todo[ha] = (info['url'], f'{ha}.narinfo') - global failure - failure = True + logging.info(f' - {len(todo)} paths to download') + + digits = len(str(len(todo))) + + def try_mirror(index, paths): + index += 1 + prefix = f'[{str(index).rjust(digits)}/{len(todo)}]' + try: + for path in paths: + url = f'{upstream_binary_cache}/{path}' + dest = working_dir / STORE_DIR / path + if dest.exists(): continue + download(url, dest) + logging.info(f' - {prefix} {path}') + return True + except (requests.exceptions.ConnectionError, WrongSize): + return False + + with ThreadPoolExecutor(max_workers=THREADS) as executor: + results = executor.map( + lambda job: try_mirror(*job), + enumerate(todo.values()) + ) + if not all(results): + channel_failure = True if channel_failure: logging.info(f' - Finished with errors, not updating symlink') @@ -371,9 +408,7 @@ def garbage_collect(): ) for path in process.stdout.splitlines(): - # /nix/store/hash-... - # ^^^^ - closure.add(path.split('/')[-1].split('-', 1)[0]) + closure.add(hash_part(path)) logging.info(f' - {len(closure)} paths in closure')