docker-ce: multithreaded downloading

This commit is contained in:
Yuxiang Zhang 2020-02-01 15:38:24 +08:00 committed by GitHub
parent 253fdbd1da
commit 108ea8426f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import subprocess as sp import threading
import queue
from pathlib import Path from pathlib import Path
from email.utils import parsedate_to_datetime from email.utils import parsedate_to_datetime
@ -14,6 +15,7 @@ WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR")
# connect and read timeout value # connect and read timeout value
TIMEOUT_OPTION = (7, 10) TIMEOUT_OPTION = (7, 10)
class RemoteSite: class RemoteSite:
def __init__(self, base_url=BASE_URL): def __init__(self, base_url=BASE_URL):
@ -23,8 +25,8 @@ class RemoteSite:
self.meta_urls = [] self.meta_urls = []
def is_metafile_url(self, url): def is_metafile_url(self, url):
deb_dists=('debian', 'ubuntu', 'raspbian') deb_dists = ('debian', 'ubuntu', 'raspbian')
rpm_dists=('fedora', 'centos') rpm_dists = ('fedora', 'centos')
for dist in deb_dists: for dist in deb_dists:
if '/'+dist+'/' not in url: if '/'+dist+'/' not in url:
@ -76,12 +78,43 @@ class RemoteSite:
yield from self.recursive_get_filelist(url, filter_meta=False) yield from self.recursive_get_filelist(url, filter_meta=False)
def curl_download(remote_url: str, dst_file: Path): def requests_download(remote_url: str, dst_file: Path):
sp.check_call([ # NOTE the stream=True parameter below
"curl", "-o", str(dst_file), with requests.get(remote_url, stream=True) as r:
"-sL", "--remote-time", "--show-error", r.raise_for_status()
"--fail", remote_url, remote_ts = parsedate_to_datetime(
]) r.headers['last-modified']).timestamp()
with open(dst_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024**2):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
# f.flush()
os.utime(dst_file, (remote_ts, remote_ts))
def downloading_worker(q):
while True:
item = q.get()
if item is None:
break
url, dst_file = item
print("downloading", url, flush=True)
try:
requests_download(url, dst_file)
except Exception:
print("Failed to download", url, flush=True)
if dst_file.is_file():
dst_file.unlink()
q.task_done()
def create_workers(n):
task_queue = queue.Queue()
for i in range(n):
t = threading.Thread(target=downloading_worker, args=(task_queue, ))
t.start()
return task_queue
def main(): def main():
@ -89,12 +122,17 @@ def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--base-url", default=BASE_URL) parser.add_argument("--base-url", default=BASE_URL)
parser.add_argument("--working-dir", default=WORKING_DIR) parser.add_argument("--working-dir", default=WORKING_DIR)
parser.add_argument("--workers", default=1, type=int,
help='number of concurrent downloading jobs')
parser.add_argument("--fast-skip", action='store_true',
help='do not verify size and timestamp of existing files')
args = parser.parse_args() args = parser.parse_args()
if args.working_dir is None: if args.working_dir is None:
raise Exception("Working Directory is None") raise Exception("Working Directory is None")
working_dir = Path(args.working_dir) working_dir = Path(args.working_dir)
task_queue = create_workers(args.workers)
remote_filelist = [] remote_filelist = []
rs = RemoteSite(args.base_url) rs = RemoteSite(args.base_url)
@ -103,6 +141,10 @@ def main():
remote_filelist.append(dst_file.relative_to(working_dir)) remote_filelist.append(dst_file.relative_to(working_dir))
if dst_file.is_file(): if dst_file.is_file():
if args.fast_skip:
print("Skipping", dst_file.relative_to(working_dir), flush=True)
continue
r = requests.head(url, timeout=TIMEOUT_OPTION) r = requests.head(url, timeout=TIMEOUT_OPTION)
remote_filesize = int(r.headers['content-length']) remote_filesize = int(r.headers['content-length'])
remote_date = parsedate_to_datetime(r.headers['last-modified']) remote_date = parsedate_to_datetime(r.headers['last-modified'])
@ -118,13 +160,13 @@ def main():
else: else:
dst_file.parent.mkdir(parents=True, exist_ok=True) dst_file.parent.mkdir(parents=True, exist_ok=True)
print("downloading", url, flush=True) task_queue.put((url, dst_file))
try:
curl_download(url, dst_file) # block until all tasks are done
except Exception: task_queue.join()
print("Failed to download", url, flush=True) # stop workers
if dst_file.is_file(): for i in range(args.workers):
dst_file.unlink() task_queue.put(None)
local_filelist = [] local_filelist = []
for local_file in working_dir.glob('**/*'): for local_file in working_dir.glob('**/*'):