Merge pull request #56 from dramforever/add-nix-channels

Update nix channels
This commit is contained in:
Yuxiang Zhang 2020-03-07 15:31:47 +08:00 committed by GitHub
commit d6545b1faf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import hashlib import hashlib
import json
import logging import logging
import lzma import lzma
import os import os
@ -11,6 +12,7 @@ import subprocess
from pyquery import PyQuery as pq from pyquery import PyQuery as pq
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
@ -26,6 +28,11 @@ else:
MIRROR_BASE_URL = os.getenv("MIRROR_BASE_URL", 'https://mirrors.tuna.tsinghua.edu.cn/nix-channels') MIRROR_BASE_URL = os.getenv("MIRROR_BASE_URL", 'https://mirrors.tuna.tsinghua.edu.cn/nix-channels')
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels') WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels')
PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192))
THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10))
DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '1') == '1'
RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30))
STORE_DIR = 'store' STORE_DIR = 'store'
RELEASES_DIR = 'releases' RELEASES_DIR = 'releases'
CLONE_SINCE = datetime(2018, 12, 1) CLONE_SINCE = datetime(2018, 12, 1)
@ -258,6 +265,9 @@ def clone_channels():
return channels_to_update return channels_to_update
def hash_part(path):
return path.split('/')[-1].split('-', 1)[0]
def update_channels(channels): def update_channels(channels):
logging.info(f'- Updating binary cache') logging.info(f'- Updating binary cache')
@ -266,9 +276,11 @@ def update_channels(channels):
for channel in channels: for channel in channels:
logging.info(f' - {channel}') logging.info(f' - {channel}')
chan_path = working_dir / channel
chan_path_update = working_dir / f'.{channel}.update' chan_path_update = working_dir / f'.{channel}.update'
upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text() upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text()
upstream_binary_cache = upstream_binary_cache.rstrip('/')
# All the channels should have https://cache.nixos.org as binary cache # All the channels should have https://cache.nixos.org as binary cache
# URL. We download nix-cache-info here (once per sync) to avoid # URL. We download nix-cache-info here (once per sync) to avoid
@ -287,34 +299,144 @@ def update_channels(channels):
logging.info(f' - {len(paths)} paths listed') logging.info(f' - {len(paths)} paths listed')
# xargs can splits up the argument lists and invokes nix copy multiple todo = {}
# times to avoid E2BIG (Argument list too long) channel_failure = False
nix_process = subprocess.Popen(
[ 'xargs', 'nix', 'copy',
'--from', upstream_binary_cache,
'--to', nix_store_dest,
'--verbose'
],
universal_newlines=True,
stdin=subprocess.PIPE
)
for path in paths: # Batch paths to avoid E2BIG
nix_process.stdin.write(path.decode() + '\n')
retcode = nix_process.wait() for i in range(0, len(paths), PATH_BATCH):
batch = paths[i : i + PATH_BATCH]
if retcode == 0: process = subprocess.run(
chan_path_update.rename(working_dir / channel) [
logging.info(f' - Done') 'nix', 'path-info',
'--store', upstream_binary_cache,
'--recursive', '--json'
] + batch,
stdout=subprocess.PIPE
)
if process.returncode != 0:
channel_failure = True
logging.info(f' - Error status: {process.returncode}')
break
else: else:
global failure infos = json.loads(process.stdout)
failure = True for info in infos:
ha = hash_part(info['path'])
todo[ha] = (info['url'], f'{ha}.narinfo')
logging.info(f' - nix copy failed') logging.info(f' - {len(todo)} paths to download')
digits = len(str(len(todo)))
def try_mirror(index, paths):
index += 1
prefix = f'[{str(index).rjust(digits)}/{len(todo)}]'
try:
for path in paths:
url = f'{upstream_binary_cache}/{path}'
dest = working_dir / STORE_DIR / path
if dest.exists(): continue
download(url, dest)
logging.info(f' - {prefix} {path}')
return True
except (requests.exceptions.ConnectionError, WrongSize):
return False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
results = executor.map(
lambda job: try_mirror(*job),
enumerate(todo.values())
)
if not all(results):
channel_failure = True
if channel_failure:
logging.info(f' - Finished with errors, not updating symlink')
else:
chan_path_update.rename(chan_path)
logging.info(f' - Finished with success, symlink updated')
def parse_narinfo(narinfo):
res = {}
for line in narinfo.splitlines():
key, value = line.split(': ', 1)
res[key] = value
return res
def garbage_collect():
logging.info(f'- Collecting garbage')
time_threshold = datetime.now() - timedelta(days=RETAIN_DAYS)
last_updated = {}
latest = {}
alive = set()
for release in (working_dir / RELEASES_DIR).iterdir():
channel = release.name.split('@')[0]
date_str = (release / '.released-time').read_text()
released_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
if released_date >= time_threshold:
alive.add(release)
if channel not in last_updated \
or last_updated[channel] < released_date:
last_updated[channel] = released_date
latest[channel] = release
alive.update(latest.values())
logging.info(f' - {len(alive)} releases alive')
closure = set()
for release in alive:
with lzma.open(str(release / 'store-paths.xz')) as f:
paths = [ path.rstrip() for path in f ]
for i in range(0, len(paths), PATH_BATCH):
batch = paths[i : i + PATH_BATCH]
process = subprocess.run(
[
'nix', 'path-info',
'--store', nix_store_dest,
'--recursive'
] + batch,
stdout=subprocess.PIPE
)
for path in process.stdout.decode().splitlines():
closure.add(hash_part(path))
logging.info(f' - {len(closure)} paths in closure')
deleted = 0
for path in (working_dir / STORE_DIR).iterdir():
if not path.name.endswith('.narinfo'):
continue
hash = path.name.split('.narinfo', 1)[0]
if hash in closure:
continue
deleted += 1
if DELETE_OLD:
narinfo = parse_narinfo(path.read_text())
path.unlink(missing_ok=True)
(working_dir / STORE_DIR / narinfo['URL']).unlink(missing_ok=True)
if DELETE_OLD:
logging.info(f' - {deleted} paths deleted')
else:
logging.info(f' - {deleted} paths now unreachable')
if __name__ == '__main__': if __name__ == '__main__':
channels = clone_channels() channels = clone_channels()
update_channels(channels) update_channels(channels)
garbage_collect()
if failure: if failure:
sys.exit(1) sys.exit(1)