From 108ea8426faefb98816e353a6fda7f262a40c584 Mon Sep 17 00:00:00 2001 From: Yuxiang Zhang Date: Sat, 1 Feb 2020 15:38:24 +0800 Subject: [PATCH] docker-ce: multithreaded downloading --- docker-ce.py | 74 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/docker-ce.py b/docker-ce.py index 25ada87..8018569 100755 --- a/docker-ce.py +++ b/docker-ce.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import os -import subprocess as sp +import threading +import queue from pathlib import Path from email.utils import parsedate_to_datetime @@ -14,6 +15,7 @@ WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR") # connect and read timeout value TIMEOUT_OPTION = (7, 10) + class RemoteSite: def __init__(self, base_url=BASE_URL): @@ -23,8 +25,8 @@ class RemoteSite: self.meta_urls = [] def is_metafile_url(self, url): - deb_dists=('debian', 'ubuntu', 'raspbian') - rpm_dists=('fedora', 'centos') + deb_dists = ('debian', 'ubuntu', 'raspbian') + rpm_dists = ('fedora', 'centos') for dist in deb_dists: if '/'+dist+'/' not in url: @@ -76,12 +78,43 @@ class RemoteSite: yield from self.recursive_get_filelist(url, filter_meta=False) -def curl_download(remote_url: str, dst_file: Path): - sp.check_call([ - "curl", "-o", str(dst_file), - "-sL", "--remote-time", "--show-error", - "--fail", remote_url, - ]) +def requests_download(remote_url: str, dst_file: Path): + # NOTE the stream=True parameter below + with requests.get(remote_url, stream=True) as r: + r.raise_for_status() + remote_ts = parsedate_to_datetime( + r.headers['last-modified']).timestamp() + with open(dst_file, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024**2): + if chunk: # filter out keep-alive new chunks + f.write(chunk) + # f.flush() + os.utime(dst_file, (remote_ts, remote_ts)) + + +def downloading_worker(q): + while True: + item = q.get() + if item is None: + break + + url, dst_file = item + print("downloading", url, flush=True) + try: + requests_download(url, dst_file) + except Exception: + print("Failed to download", url, flush=True) + if dst_file.is_file(): + dst_file.unlink() + q.task_done() + + +def create_workers(n): + task_queue = queue.Queue() + for i in range(n): + t = threading.Thread(target=downloading_worker, args=(task_queue, )) + t.start() + return task_queue def main(): @@ -89,12 +122,17 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--base-url", default=BASE_URL) parser.add_argument("--working-dir", default=WORKING_DIR) + parser.add_argument("--workers", default=1, type=int, + help='number of concurrent downloading jobs') + parser.add_argument("--fast-skip", action='store_true', + help='do not verify size and timestamp of existing files') args = parser.parse_args() if args.working_dir is None: raise Exception("Working Directory is None") working_dir = Path(args.working_dir) + task_queue = create_workers(args.workers) remote_filelist = [] rs = RemoteSite(args.base_url) @@ -103,6 +141,10 @@ def main(): remote_filelist.append(dst_file.relative_to(working_dir)) if dst_file.is_file(): + if args.fast_skip: + print("Skipping", dst_file.relative_to(working_dir), flush=True) + continue + r = requests.head(url, timeout=TIMEOUT_OPTION) remote_filesize = int(r.headers['content-length']) remote_date = parsedate_to_datetime(r.headers['last-modified']) @@ -118,13 +160,13 @@ def main(): else: dst_file.parent.mkdir(parents=True, exist_ok=True) - print("downloading", url, flush=True) - try: - curl_download(url, dst_file) - except Exception: - print("Failed to download", url, flush=True) - if dst_file.is_file(): - dst_file.unlink() + task_queue.put((url, dst_file)) + + # block until all tasks are done + task_queue.join() + # stop workers + for i in range(args.workers): + task_queue.put(None) local_filelist = [] for local_file in working_dir.glob('**/*'):