From 00a82c5e4ea9299f09c18a7545fd83b41a42de1b Mon Sep 17 00:00:00 2001 From: dramforever Date: Tue, 3 Mar 2020 14:07:05 +0800 Subject: [PATCH 1/8] nix-channels: Split argument list in script --- nix-channels.py | 73 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/nix-channels.py b/nix-channels.py index bec8ec9..0050e53 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -258,7 +258,56 @@ def clone_channels(): return channels_to_update +def make_xargs(): + arg_max = int(subprocess.run( + [ 'getconf', 'ARG_MAX' ], + stdout=subprocess.PIPE + ).stdout) + + env_size = sum( + len(k.encode()) + len(v.encode()) + 2 + for k, v in os.environ.items() + ) + + # Subtract 1024 to be safe safe + usable = arg_max - env_size - 1024 + + logging.info(f'- mk_xargs: Usable command line bytes: {usable}') + + def check_run(args): + global failure + p = subprocess.run(args) + if p.returncode != 0: + logging.info(f' - Subprocess failed with code {p.returncode}') + failure = True + + def xargs(common, args): + common = [ a.encode() if isinstance(a, str) else a for a in common ] + args = [ a.encode() if isinstance(a, str) else a for a in args ] + + common_size = sum(len(a) + 1 for a in common) + + buf, cur_size = [], common_size + + for a in args: + size = len(a) + 1 + if cur_size + size > usable: + check_run(common + buf) + buf, cur_size = [], common_size + + buf.append(a) + cur_size += size + + assert cur_size <= usable, f'Argument {a[:50]}... will never fit' + + if buf: + check_run(common + buf) + + return xargs + def update_channels(channels): + xargs = make_xargs() + logging.info(f'- Updating binary cache') has_cache_info = False @@ -287,31 +336,17 @@ def update_channels(channels): logging.info(f' - {len(paths)} paths listed') - # xargs can splits up the argument lists and invokes nix copy multiple - # times to avoid E2BIG (Argument list too long) - nix_process = subprocess.Popen( - [ 'xargs', 'nix', 'copy', + xargs( + [ + 'nix', 'copy', '--from', upstream_binary_cache, '--to', nix_store_dest, '--verbose' ], - universal_newlines=True, - stdin=subprocess.PIPE + paths ) - for path in paths: - nix_process.stdin.write(path.decode() + '\n') - - retcode = nix_process.wait() - - if retcode == 0: - chan_path_update.rename(working_dir / channel) - logging.info(f' - Done') - else: - global failure - failure = True - - logging.info(f' - nix copy failed') + logging.info(f' - Finished') if __name__ == '__main__': channels = clone_channels() From 924a828a6a61405ce0ce8ab484af4a22f454d3f4 Mon Sep 17 00:00:00 2001 From: dramforever Date: Tue, 3 Mar 2020 15:05:36 +0800 Subject: [PATCH 2/8] nix-channels: Copy paths in batches --- nix-channels.py | 76 ++++++++++++------------------------------------- 1 file changed, 18 insertions(+), 58 deletions(-) diff --git a/nix-channels.py b/nix-channels.py index 0050e53..492aac4 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -258,56 +258,7 @@ def clone_channels(): return channels_to_update -def make_xargs(): - arg_max = int(subprocess.run( - [ 'getconf', 'ARG_MAX' ], - stdout=subprocess.PIPE - ).stdout) - - env_size = sum( - len(k.encode()) + len(v.encode()) + 2 - for k, v in os.environ.items() - ) - - # Subtract 1024 to be safe safe - usable = arg_max - env_size - 1024 - - logging.info(f'- mk_xargs: Usable command line bytes: {usable}') - - def check_run(args): - global failure - p = subprocess.run(args) - if p.returncode != 0: - logging.info(f' - Subprocess failed with code {p.returncode}') - failure = True - - def xargs(common, args): - common = [ a.encode() if isinstance(a, str) else a for a in common ] - args = [ a.encode() if isinstance(a, str) else a for a in args ] - - common_size = sum(len(a) + 1 for a in common) - - buf, cur_size = [], common_size - - for a in args: - size = len(a) + 1 - if cur_size + size > usable: - check_run(common + buf) - buf, cur_size = [], common_size - - buf.append(a) - cur_size += size - - assert cur_size <= usable, f'Argument {a[:50]}... will never fit' - - if buf: - check_run(common + buf) - - return xargs - def update_channels(channels): - xargs = make_xargs() - logging.info(f'- Updating binary cache') has_cache_info = False @@ -336,15 +287,24 @@ def update_channels(channels): logging.info(f' - {len(paths)} paths listed') - xargs( - [ - 'nix', 'copy', - '--from', upstream_binary_cache, - '--to', nix_store_dest, - '--verbose' - ], - paths - ) + # Batch paths to avoid E2BIG + + PATH_BATCH = 128 + + for i in range(0, len(paths), PATH_BATCH): + batch = paths[i : i + PATH_BATCH] + process = subprocess.run( + [ + 'nix', 'copy', + '--from', upstream_binary_cache, + '--to', nix_store_dest, + '--verbose' + ] + batch + ) + if process.returncode != 0: + logging.info(f' - Error status: {process.returncode}') + global failure + failure = True logging.info(f' - Finished') From d00613ee40b79f0feaead4f3435932474ae7714e Mon Sep 17 00:00:00 2001 From: dramforever Date: Wed, 4 Mar 2020 22:42:45 +0800 Subject: [PATCH 3/8] nix-channels: Fix forgetting to symlink channel --- nix-channels.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nix-channels.py b/nix-channels.py index 492aac4..9dc23fd 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -266,6 +266,7 @@ def update_channels(channels): for channel in channels: logging.info(f' - {channel}') + chan_path = working_dir / channel chan_path_update = working_dir / f'.{channel}.update' upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text() @@ -291,6 +292,8 @@ def update_channels(channels): PATH_BATCH = 128 + channel_failure = False + for i in range(0, len(paths), PATH_BATCH): batch = paths[i : i + PATH_BATCH] process = subprocess.run( @@ -303,10 +306,16 @@ def update_channels(channels): ) if process.returncode != 0: logging.info(f' - Error status: {process.returncode}') + channel_failure = True + global failure failure = True - logging.info(f' - Finished') + if channel_failure: + logging.info(f' - Finished with errors, not updating symlink') + else: + chan_path_update.rename(chan_path) + logging.info(f' - Finished with success, symlink updated') if __name__ == '__main__': channels = clone_channels() From 730e542b48992497f708fe64ae58113aab19761a Mon Sep 17 00:00:00 2001 From: dramforever Date: Fri, 6 Mar 2020 23:32:00 +0800 Subject: [PATCH 4/8] nix-channels: Larger batch size and configurable --- nix-channels.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nix-channels.py b/nix-channels.py index 9dc23fd..427cae0 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -26,6 +26,8 @@ else: MIRROR_BASE_URL = os.getenv("MIRROR_BASE_URL", 'https://mirrors.tuna.tsinghua.edu.cn/nix-channels') WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') +PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) + STORE_DIR = 'store' RELEASES_DIR = 'releases' CLONE_SINCE = datetime(2018, 12, 1) @@ -290,8 +292,6 @@ def update_channels(channels): # Batch paths to avoid E2BIG - PATH_BATCH = 128 - channel_failure = False for i in range(0, len(paths), PATH_BATCH): From d739c1fa5beb23987dcf8269a8de5cae9f710e29 Mon Sep 17 00:00:00 2001 From: dramforever Date: Sat, 7 Mar 2020 00:28:56 +0800 Subject: [PATCH 5/8] nix-channels: Add garbage collection --- nix-channels.py | 83 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/nix-channels.py b/nix-channels.py index 427cae0..77b38d8 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -27,6 +27,8 @@ else: WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) +DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '0') == '1' +RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30)) STORE_DIR = 'store' RELEASES_DIR = 'releases' @@ -317,8 +319,89 @@ def update_channels(channels): chan_path_update.rename(chan_path) logging.info(f' - Finished with success, symlink updated') +def parse_narinfo(narinfo): + res = {} + for line in narinfo.splitlines(): + key, value = line.split(': ', 1) + res[key] = value + return res + +def garbage_collect(): + logging.info(f'- Collecting garbage') + + time_threshold = datetime.now() - timedelta(days=RETAIN_DAYS) + + last_updated = {} + latest = {} + alive = set() + + for release in (working_dir / RELEASES_DIR).iterdir(): + channel = release.name.split('@')[0] + date_str = (release / '.released-time').read_text + released_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + + if released_date >= time_threshold: + alive.add(release) + + if channel not in last_updated \ + or last_updated[channel] < released_date: + last_updated[channel] = released_date + latest[channel] = release + + alive.update(latest.values()) + + logging.info(f' - {len(alive)} releases alive') + + closure = set() + + for release in alive: + with lzma.open(str(release / 'store-paths.xz')) as f: + paths = [ path.rstrip() for path in f ] + + for i in range(0, len(paths), PATH_BATCH): + batch = paths[i : i + PATH_BATCH] + + process = subprocess.run( + [ + 'nix', 'path-info', + '--store', nix_store_dest, + '--recursive' + ] + batch, + stdout=subprocess.PIPE + ) + + for path in process.stdout.splitlines(): + # /nix/store/hash-... + # ^^^^ + closure.add(path.split('/')[-1].split('-', 1)[0]) + + logging.info(f' - {len(closure)} paths in closure') + + deleted = 0 + + for path in (working_dir / STORE_DIR).iterdir(): + if not path.name.endswith('.narinfo'): + continue + + hash = path.split('.narinfo', 1)[0] + if hash in closure: + continue + + deleted += 1 + + if DELETE_OLD: + narinfo = parse_narinfo(path.read_text) + path.unlink() + (working_dir / STORE_DIR / narinfo['URL']).unlink() + + if DELETE_OLD: + logging.info(f' - {deleted} paths deleted') + else: + logging.info(f' - {deleted} paths now unreachable') + if __name__ == '__main__': channels = clone_channels() update_channels(channels) + garbage_collect() if failure: sys.exit(1) From 35c2897988fe0de2c1a78ae722a4fcfc91e8b5d7 Mon Sep 17 00:00:00 2001 From: dramforever Date: Sat, 7 Mar 2020 02:25:49 +0800 Subject: [PATCH 6/8] nix-channels: Avoid nix copy xz performance issue Get URL from nix store-path and download using a thread pool --- nix-channels.py | 61 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/nix-channels.py b/nix-channels.py index 77b38d8..e661eb5 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import hashlib +import json import logging import lzma import os @@ -11,6 +12,7 @@ import subprocess from pyquery import PyQuery as pq from datetime import datetime, timedelta from pathlib import Path +from concurrent.futures import ThreadPoolExecutor from urllib3.util.retry import Retry @@ -27,6 +29,7 @@ else: WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) +THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10)) DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '0') == '1' RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30)) @@ -262,6 +265,9 @@ def clone_channels(): return channels_to_update +def hash_part(path): + return path.split('/')[-1].split('-', 1)[0] + def update_channels(channels): logging.info(f'- Updating binary cache') @@ -274,6 +280,7 @@ def update_channels(channels): chan_path_update = working_dir / f'.{channel}.update' upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text() + upstream_binary_cache = upstream_binary_cache.rstrip('/') # All the channels should have https://cache.nixos.org as binary cache # URL. We download nix-cache-info here (once per sync) to avoid @@ -292,26 +299,56 @@ def update_channels(channels): logging.info(f' - {len(paths)} paths listed') - # Batch paths to avoid E2BIG - + todo = {} channel_failure = False + # Batch paths to avoid E2BIG + for i in range(0, len(paths), PATH_BATCH): batch = paths[i : i + PATH_BATCH] process = subprocess.run( [ - 'nix', 'copy', - '--from', upstream_binary_cache, - '--to', nix_store_dest, - '--verbose' - ] + batch + 'nix', 'path-info', + '--store', upstream_binary_cache, + '--recursive', '--json' + ] + batch, + stdout=subprocess.PIPE ) if process.returncode != 0: - logging.info(f' - Error status: {process.returncode}') channel_failure = True + logging.info(f' - Error status: {process.returncode}') + break + else: + infos = json.loads(process.stdout) + for info in infos: + ha = hash_part(info['path']) + todo[ha] = (info['url'], f'{ha}.narinfo') - global failure - failure = True + logging.info(f' - {len(todo)} paths to download') + + digits = len(str(len(todo))) + + def try_mirror(index, paths): + index += 1 + prefix = f'[{str(index).rjust(digits)}/{len(todo)}]' + try: + for path in paths: + url = f'{upstream_binary_cache}/{path}' + dest = working_dir / STORE_DIR / path + if dest.exists(): continue + download(url, dest) + logging.info(f' - {prefix} {path}') + return True + except (requests.exceptions.ConnectionError, WrongSize): + return False + + with ThreadPoolExecutor(max_workers=THREADS) as executor: + results = executor.map( + lambda job: try_mirror(*job), + enumerate(todo.values()) + ) + if not all(results): + channel_failure = True if channel_failure: logging.info(f' - Finished with errors, not updating symlink') @@ -371,9 +408,7 @@ def garbage_collect(): ) for path in process.stdout.splitlines(): - # /nix/store/hash-... - # ^^^^ - closure.add(path.split('/')[-1].split('-', 1)[0]) + closure.add(hash_part(path)) logging.info(f' - {len(closure)} paths in closure') From 143744c9341304e289f733f2b6c2abe7958be9d4 Mon Sep 17 00:00:00 2001 From: dramforever Date: Sat, 7 Mar 2020 02:35:19 +0800 Subject: [PATCH 7/8] nix-channels: minor fixes --- nix-channels.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nix-channels.py b/nix-channels.py index e661eb5..de19cf1 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -374,7 +374,7 @@ def garbage_collect(): for release in (working_dir / RELEASES_DIR).iterdir(): channel = release.name.split('@')[0] - date_str = (release / '.released-time').read_text + date_str = (release / '.released-time').read_text() released_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') if released_date >= time_threshold: @@ -407,7 +407,7 @@ def garbage_collect(): stdout=subprocess.PIPE ) - for path in process.stdout.splitlines(): + for path in process.stdout.decode().splitlines(): closure.add(hash_part(path)) logging.info(f' - {len(closure)} paths in closure') @@ -418,16 +418,16 @@ def garbage_collect(): if not path.name.endswith('.narinfo'): continue - hash = path.split('.narinfo', 1)[0] + hash = path.name.split('.narinfo', 1)[0] if hash in closure: continue deleted += 1 if DELETE_OLD: - narinfo = parse_narinfo(path.read_text) - path.unlink() - (working_dir / STORE_DIR / narinfo['URL']).unlink() + narinfo = parse_narinfo(path.read_text()) + path.unlink(missing_ok=True) + (working_dir / STORE_DIR / narinfo['URL']).unlink(missing_ok=True) if DELETE_OLD: logging.info(f' - {deleted} paths deleted') From 07e533de8fd591e19753ad789c4c85b81528ff53 Mon Sep 17 00:00:00 2001 From: dramforever Date: Sat, 7 Mar 2020 04:16:49 +0800 Subject: [PATCH 8/8] nix-channels: Enable GC by default --- nix-channels.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nix-channels.py b/nix-channels.py index de19cf1..7253d1a 100644 --- a/nix-channels.py +++ b/nix-channels.py @@ -30,7 +30,7 @@ else: PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10)) -DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '0') == '1' +DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '1') == '1' RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30)) STORE_DIR = 'store'