twp-stage rsync support

This commit is contained in:
bigeagle 2015-05-17 14:46:40 +08:00
parent 0f3b7b2248
commit 420a19e197
5 changed files with 124 additions and 34 deletions

View File

@ -40,6 +40,17 @@ command = "sleep 20"
local_dir = "/mnt/sdb1/mirror/archlinux/current/"
# log_file = "/dev/null"
[[mirrors]]
name = "arch3"
provider = "two-stage-rsync"
stage1_profile = "debian"
upstream = "/tmp/rsync_test/src/"
local_dir = "/tmp/rsync_test/dst/"
log_file = "/tmp/rsync_test/log"
# log_file = "/dev/null"
no_delay = true
[[mirrors]]
name = "arch4"
provider = "shell"

View File

@ -54,13 +54,14 @@ def run_job(sema, child_q, manager_q, provider, **settings):
for retry in range(max_retry):
manager_q.put(("UPDATE", (provider.name, status, ctx)))
print("start syncing {}, retry: {}".format(provider.name, retry))
provider.run(ctx=ctx)
status = "success"
try:
provider.run(ctx=ctx)
provider.wait()
except sh.ErrorReturnCode:
status = "fail"
else:
status = "success"
if status == "success":
break

View File

@ -2,7 +2,7 @@
# -*- coding:utf-8 -*-
import os
from datetime import datetime
from .mirror_provider import RsyncProvider, ShellProvider
from .mirror_provider import RsyncProvider, TwoStageRsyncProvider, ShellProvider
from .btrfs_snapshot import BtrfsHook
from .loglimit import LogLimitHook
from .exec_pre_post import CmdExecHook
@ -10,7 +10,7 @@ from .exec_pre_post import CmdExecHook
class MirrorConfig(object):
_valid_providers = set(("rsync", "debmirror", "shell", ))
_valid_providers = set(("rsync", "two-stage-rsync", "shell", ))
def __init__(self, parent, options):
self._parent = parent
@ -60,38 +60,48 @@ class MirrorConfig(object):
return self.__dict__["options"].get(key, None)
def to_provider(self, hooks=[], no_delay=False):
kwargs = {
'name': self.name,
'upstream_url': self.upstream,
'local_dir': self.local_dir,
'log_dir': self.log_dir,
'log_file': self.log_file,
'interval': self.interval,
'hooks': hooks,
}
if self.provider == "rsync":
provider = RsyncProvider(
name=self.name,
upstream_url=self.upstream,
local_dir=self.local_dir,
log_dir=self.log_dir,
useIPv6=self.use_ipv6,
password=self.password,
exclude_file=self.exclude_file,
log_file=self.log_file,
interval=self.interval,
hooks=hooks,
)
kwargs.update({
'useIPv6': self.use_ipv6,
'password': self.password,
'exclude_file': self.exclude_file,
})
provider = RsyncProvider(**kwargs)
elif self.provider == "two-stage-rsync":
kwargs.update({
'useIPv6': self.use_ipv6,
'password': self.password,
'exclude_file': self.exclude_file,
})
provider = TwoStageRsyncProvider(**kwargs)
provider.set_stage1_profile(self.stage1_profile)
elif self.options["provider"] == "shell":
provider = ShellProvider(
name=self.name,
command=self.command,
upstream_url=self.upstream,
local_dir=self.local_dir,
log_dir=self.log_dir,
log_file=self.log_file,
log_stdout=self.options.get("log_stdout", True),
interval=self.interval,
hooks=hooks
)
kwargs.update({
'command': self.command,
'log_stdout': self.options.get("log_stdout", True),
})
provider = ShellProvider(**kwargs)
if not no_delay:
sm = self._parent.status_manager
last_update = sm.get_info(self.name, 'last_update')
if last_update not in (None, '-'):
last_update = datetime.strptime(last_update,
'%Y-%m-%d %H:%M:%S')
last_update = datetime.strptime(
last_update, '%Y-%m-%d %H:%M:%S')
delay = int(last_update.strftime("%s")) \
+ self.interval * 60 - int(datetime.now().strftime("%s"))
if delay < 0:

View File

@ -59,8 +59,8 @@ class MirrorProvider(object):
class RsyncProvider(MirrorProvider):
_default_options = ['-aHvh', '--no-o', '--no-g', '--stats',
'--exclude', '.~tmp~/',
'--delete', '--delete-after', '--delay-updates',
'--exclude .~tmp~/',
'--safe-links', '--timeout=120', '--contimeout=120']
def __init__(self, name, upstream_url, local_dir, log_dir,
@ -100,8 +100,76 @@ class RsyncProvider(MirrorProvider):
if self.password is not None:
new_env["RSYNC_PASSWORD"] = self.password
self.p = sh.rsync(*_args, _env=new_env, _out=log_file, _err=log_file,
_out_bufsize=1, _bg=True)
self.p = sh.rsync(*_args, _env=new_env, _out=log_file,
_err_to_out=True, _out_bufsize=1, _bg=True)
class TwoStageRsyncProvider(RsyncProvider):
_stage1_options = ['-aHvh', '--no-o', '--no-g',
'--exclude', '.~tmp~/',
'--safe-links', '--timeout=120', '--contimeout=120']
_stage2_options = ['-aHvh', '--no-o', '--no-g', '--stats',
'--exclude', '.~tmp~/',
'--delete', '--delete-after', '--delay-updates',
'--safe-links', '--timeout=120', '--contimeout=120']
_stage1_profiles = {
"debian": [
'Packages*', 'Sources*', 'Release*',
'InRelease', 'i18n/*', 'ls-lR*',
]
}
def set_stage1_profile(self, profile):
if profile not in self._stage1_profiles:
raise Exception("Profile Undefined!")
self._stage1_excludes = self._stage1_profiles[profile]
def options(self, stage):
_default_options = self._stage1_options \
if stage == 1 else self._stage2_options
_options = [o for o in _default_options] # copy
if stage == 1:
for _exc in self._stage1_excludes:
_options.append("--exclude")
_options.append(_exc)
if self.useIPv6:
_options.append("-6")
if self.exclude_file:
_options.append("--exclude-from")
_options.append(self.exclude_file)
return _options
def run(self, ctx={}):
working_dir = ctx.get("current_dir", self.local_dir)
log_file = self.get_log_file(ctx)
new_env = os.environ.copy()
if self.password is not None:
new_env["RSYNC_PASSWORD"] = self.password
with open(log_file, 'w', buffering=1) as f:
def log_output(line):
f.write(line)
for stage in (1, 2):
_args = self.options(stage)
_args.append(self.upstream_url)
_args.append(working_dir)
f.write("==== Stage {} Begins ====\n\n".format(stage))
self.p = sh.rsync(
*_args, _env=new_env, _out=log_output,
_err_to_out=True, _out_bufsize=1, _bg=False
)
self.p.wait()
class ShellProvider(MirrorProvider):
@ -133,7 +201,7 @@ class ShellProvider(MirrorProvider):
if self.log_stdout:
self.p = cmd(*_args, _env=new_env, _out=log_file,
_err=log_file, _out_bufsize=1, _bg=True)
_err_to_out=True, _out_bufsize=1, _bg=True)
else:
self.p = cmd(*_args, _env=new_env, _out='/dev/null',
_err='/dev/null', _out_bufsize=1, _bg=True)

View File

@ -82,7 +82,7 @@ class TUNASync(object):
for name, mirror in self.mirrors.iteritems():
hooks = mirror.hooks() + self.hooks()
provider = mirror.to_provider(hooks)
provider = mirror.to_provider(hooks, no_delay=mirror.no_delay)
self._providers[name] = provider
return self._providers