nix-channels: Avoid nix copy xz performance issue

Get URL from nix store-path and download using a thread pool
This commit is contained in:
dramforever 2020-03-07 02:25:49 +08:00
parent d739c1fa5b
commit 35c2897988

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import hashlib import hashlib
import json
import logging import logging
import lzma import lzma
import os import os
@ -11,6 +12,7 @@ import subprocess
from pyquery import PyQuery as pq from pyquery import PyQuery as pq
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
@ -27,6 +29,7 @@ else:
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels')
PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192)) PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192))
THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10))
DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '0') == '1' DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '0') == '1'
RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30)) RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30))
@ -262,6 +265,9 @@ def clone_channels():
return channels_to_update return channels_to_update
def hash_part(path):
return path.split('/')[-1].split('-', 1)[0]
def update_channels(channels): def update_channels(channels):
logging.info(f'- Updating binary cache') logging.info(f'- Updating binary cache')
@ -274,6 +280,7 @@ def update_channels(channels):
chan_path_update = working_dir / f'.{channel}.update' chan_path_update = working_dir / f'.{channel}.update'
upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text() upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text()
upstream_binary_cache = upstream_binary_cache.rstrip('/')
# All the channels should have https://cache.nixos.org as binary cache # All the channels should have https://cache.nixos.org as binary cache
# URL. We download nix-cache-info here (once per sync) to avoid # URL. We download nix-cache-info here (once per sync) to avoid
@ -292,26 +299,56 @@ def update_channels(channels):
logging.info(f' - {len(paths)} paths listed') logging.info(f' - {len(paths)} paths listed')
# Batch paths to avoid E2BIG todo = {}
channel_failure = False channel_failure = False
# Batch paths to avoid E2BIG
for i in range(0, len(paths), PATH_BATCH): for i in range(0, len(paths), PATH_BATCH):
batch = paths[i : i + PATH_BATCH] batch = paths[i : i + PATH_BATCH]
process = subprocess.run( process = subprocess.run(
[ [
'nix', 'copy', 'nix', 'path-info',
'--from', upstream_binary_cache, '--store', upstream_binary_cache,
'--to', nix_store_dest, '--recursive', '--json'
'--verbose' ] + batch,
] + batch stdout=subprocess.PIPE
) )
if process.returncode != 0: if process.returncode != 0:
logging.info(f' - Error status: {process.returncode}')
channel_failure = True channel_failure = True
logging.info(f' - Error status: {process.returncode}')
break
else:
infos = json.loads(process.stdout)
for info in infos:
ha = hash_part(info['path'])
todo[ha] = (info['url'], f'{ha}.narinfo')
global failure logging.info(f' - {len(todo)} paths to download')
failure = True
digits = len(str(len(todo)))
def try_mirror(index, paths):
index += 1
prefix = f'[{str(index).rjust(digits)}/{len(todo)}]'
try:
for path in paths:
url = f'{upstream_binary_cache}/{path}'
dest = working_dir / STORE_DIR / path
if dest.exists(): continue
download(url, dest)
logging.info(f' - {prefix} {path}')
return True
except (requests.exceptions.ConnectionError, WrongSize):
return False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
results = executor.map(
lambda job: try_mirror(*job),
enumerate(todo.values())
)
if not all(results):
channel_failure = True
if channel_failure: if channel_failure:
logging.info(f' - Finished with errors, not updating symlink') logging.info(f' - Finished with errors, not updating symlink')
@ -371,9 +408,7 @@ def garbage_collect():
) )
for path in process.stdout.splitlines(): for path in process.stdout.splitlines():
# /nix/store/hash-... closure.add(hash_part(path))
# ^^^^
closure.add(path.split('/')[-1].split('-', 1)[0])
logging.info(f' - {len(closure)} paths in closure') logging.info(f' - {len(closure)} paths in closure')